Flink第七章:状态编程

news/2024/11/28 6:52:36/

系列文章目录

Flink第一章:环境搭建
Flink第二章:基本操作.
Flink第三章:基本操作(二)
Flink第四章:水位线和窗口
Flink第五章:处理函数
Flink第六章:多流操作
Flink第七章:状态编程


文章目录

  • 系列文章目录
  • 前言
  • 一、Keyed State(按键分区)
    • 1.KeyedStateTest.scala
    • 2.PeriodicPVExample.scala
    • 3.TwoStreamJoinExample.scala
    • 4.FakeWindowExample.scala
    • 5.AverageTimestampExample.scala
  • 二、Operator State(算子状态)
    • 1.BufferingSinkExample.scala
  • 三、Broadcast State(广播状态)
    • 1.BroadcastStateExample.scala
  • 总结


前言

这次我们来学习Flink中的状态学习部分,创建以下scala文件
在这里插入图片描述


一、Keyed State(按键分区)

1.KeyedStateTest.scala

这个文件里有几个常用的状态创建

package com.atguigu.chapter06import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.api.common.functions.{AggregateFunction, ReduceFunction, RichFlatMapFunction}
import org.apache.flink.api.common.state._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collectorobject KeyedStateTest {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.addSource(new ClickSource).assignAscendingTimestamps(_.timestamp).keyBy(_.user).flatMap(new MyFlatMap)env.execute()}class MyFlatMap extends RichFlatMapFunction[Event, String] {// 定义状态var valueState: ValueState[Event] = _var listState: ListState[Event] = _var mapState: MapState[String, Long] = _var reduceState:ReducingState[Event]= _var aggState:AggregatingState[Event,String]= _override def open(parameters: Configuration): Unit = {valueState = getRuntimeContext.getState(new ValueStateDescriptor[Event]("my-value", classOf[Event]))listState = getRuntimeContext.getListState(new ListStateDescriptor[Event]("my-list", classOf[Event]))mapState = getRuntimeContext.getMapState(new MapStateDescriptor[String, Long]("my-map", classOf[String], classOf[Long]))reduceState=getRuntimeContext.getReducingState(new ReducingStateDescriptor[Event]("my-reduce",new ReduceFunction[Event] {override def reduce(t: Event, t1: Event): Event = Event(t.user,t.url,t1.timestamp)},classOf[Event]))aggState=getRuntimeContext.getAggregatingState(new AggregatingStateDescriptor[Event,Long,String]("my-agg",new AggregateFunction[Event,Long,String] {override def createAccumulator(): Long = 0Loverride def add(in: Event, acc: Long): Long = acc+1override def getResult(acc: Long): String = "当前聚合状态为:"+acc.toStringoverride def merge(acc: Long, acc1: Long): Long = ???},classOf[Long]))}override def flatMap(in: Event, collector: Collector[String]): Unit = {// 对状态进行操作println("值状态为:" + valueState.value())valueState.update(in)println("值状态为:" + valueState.value())listState.add(in)println("------------")val count: Long =if (mapState.contains(in.user)) mapState.get(in.user) else 0mapState.put(in.user,count+1)println(s"用户 ${in.user} 的访问频次为: ${mapState.get(in.user)}")println("-------------")reduceState.add(in)println(reduceState.get())println("-------------")aggState.add(in)println(aggState.get())println("=================")}}
}

在这里插入图片描述

2.PeriodicPVExample.scala

按键分区中值状态编程案例

我们这里会使用用户 id 来进行分流,然后分别统计每个用户的 pv 数据,由于我们并不想每次 pv 加一,就将统计结果发送到下游去,所以这里我们注册了一个定时器,用来隔一段时间发送 pv 的统计结果,这样对下游算子的压力不至于太大。具体实现方式是定义一个用来保存定时器时间戳的值状态变量。当定时器触发并向下游发送数据以后,便清空储存定时器时间戳的状态变量,这样当新的数据到来时,发现并没有定时器存在,就可以注册新的定时器了,注册完定时器之后将定时器的时间戳继续保存在状态变量中。

package com.atguigu.chapter06import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collectorobject PeriodicPVExample {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.addSource(new ClickSource).assignAscendingTimestamps(_.timestamp).keyBy(_.user).process(new PeriodicPv).print()env.execute()}class PeriodicPv extends KeyedProcessFunction[String, Event, String] {// 定义值状态,保存当前用户的pv数据lazy val countState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("count", classOf[Long]))//定义值状态,保存定时器的时间戳lazy val timerTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("time-Ts", classOf[Long]))override def processElement(value: Event, ctx: KeyedProcessFunction[String, Event, String]#Context, out: Collector[String]): Unit = {// 每来一个数据,就将状态中的count+1val count: Long = countState.value()countState.update(count + 1)// 注册定时器,每隔10秒输出一次统计结果if (timerTsState.value() == 0L) {ctx.timerService().registerEventTimeTimer(value.timestamp + 10 * 1000L)//更新状态timerTsState.update(value.timestamp + 10 * 1000L)}}// 定时器触发override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, Event, String]#OnTimerContext, out: Collector[String]): Unit = {out.collect(s"用户 ${ctx.getCurrentKey}的pv值为:${countState.value()}")// 清理状态timerTsState.clear()}}
}

在这里插入图片描述
10s统计一次,并且不断累加,有点像全局窗口.

3.TwoStreamJoinExample.scala

列表状态编程

SELECT * FROM A INNER JOIN B WHERE A.id = B.id;
这样一条 SQL 语句要慎用,因为 Flink 会将 A 流和 B 流的所有数据都保存下来,然后进行 join。不过在这里我们可以用列表状态变量来实现一下这个 SQL 语句的功能。

package com.atguigu.chapter06import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collectorobject TwoStreamJoinExample {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val stream1: DataStream[(String, String, Long)] = env.fromElements(("a","stream-1",1000L),("b","stream-1",2000L),).assignAscendingTimestamps(_._3)val stream2: DataStream[(String, String, Long)] = env.fromElements(("a","stream-2",3000L),("b","stream-2",4000L),).assignAscendingTimestamps(_._3)// 连接两条流进行Join操作stream1.keyBy(_._1).connect(stream2.keyBy(_._1)).process(new TwoStreamJoin).print()env.execute()}class TwoStreamJoin extends CoProcessFunction[(String, String, Long),(String, String, Long),String] {// 定义列表状态,保存流中已经到达的数据lazy val stream1ListState: ListState[(String, String, Long)] = getRuntimeContext.getListState(new ListStateDescriptor[(String, String, Long)]("stream1-list", classOf[(String, String, Long)]))lazy val stream2ListState: ListState[(String, String, Long)] = getRuntimeContext.getListState(new ListStateDescriptor[(String, String, Long)]("stream2-list", classOf[(String, String, Long)]))override def processElement1(value1: (String, String, Long), ctx: CoProcessFunction[(String, String, Long), (String, String, Long), String]#Context, out: Collector[String]): Unit = {// 直接添加到列表状态中stream1ListState.add(value1)//遍历另一条流中已经到达的数据,输出配对信心import scala.collection.convert.ImplicitConversions._for (value2<-stream2ListState.get()){out.collect(value1+"=>"+value2)}}override def processElement2(value2: (String, String, Long), ctx: CoProcessFunction[(String, String, Long), (String, String, Long), String]#Context, out: Collector[String]): Unit = {// 直接添加到列表状态中stream2ListState.add(value2)//遍历另一条流中已经到达的数据,输出配对信心import scala.collection.convert.ImplicitConversions._for (value1<-stream1ListState.get()){out.collect(value1+"=>"+value2)}}}
}

在这里插入图片描述

4.FakeWindowExample.scala

映射状态编程

映射状态的用法和 Java 中的 HashMap 很相似。在这里我们可以通过 MapState 的使用来探
索一下窗口的底层实现,也就是我们要用映射状态来完整模拟窗口的功能。这里我们模拟一个
滚动窗口。我们要计算的是每一个 url 在每一个窗口中的 pv 数据。我们之前使用增量聚合和
全窗口聚合结合的方式实现过这个需求。这里我们用 MapState 再来实现一下。

package com.atguigu.chapter06import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collectorobject FakeWindowExample {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.addSource(new ClickSource).assignAscendingTimestamps(_.timestamp).keyBy(_.url).process(new FakeWindow(10000L)) //10秒的滚动窗口.print()env.execute()}class FakeWindow(size:Long) extends KeyedProcessFunction[String,Event,String]{//定义一个映射状态,用来保存一个窗口的pv值lazy val windowMapSate: MapState[Long, Long] = getRuntimeContext.getMapState(new MapStateDescriptor[Long, Long]("window-pv", classOf[Long], classOf[Long]))override def processElement(value: Event, ctx: KeyedProcessFunction[String, Event, String]#Context, out: Collector[String]): Unit = {//集散当前数据落入窗口的启示时间戳val start: Long = value.timestamp / size * sizeval end: Long = start + size// 注册一个定时器,用力触发窗口计算ctx.timerService().registerEventTimeTimer(end-1)// 更新状态 count+1if (windowMapSate.contains(start)){val pv: Long = windowMapSate.get(start)windowMapSate.put(start,pv+1)} else {windowMapSate.put(start,1L)}}override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, Event, String]#OnTimerContext, out: Collector[String]): Unit = {// 定时器触发,窗口输出结果val start: Long = timestamp + 1 - sizeval pv: Long = windowMapSate.get(start)// 窗口输出结果out.collect(s"url: ${ctx.getCurrentKey} 浏览量为: ${pv} 窗口为:${start}-${start+size}")//窗口销毁windowMapSate.remove(start)}}
}

在这里插入图片描述

5.AverageTimestampExample.scala

聚合状态编程

我们举一个简单的例子,对用户点击事件流每 5 个数据统计一次平均时间戳。这是一个类似计数窗口(CountWindow)求平均值的计算,这里我们可以使用一个有聚合状态的
RichFlatMapFunction 来实现。

package com.atguigu.chapter06import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.api.common.functions.{AggregateFunction, RichFlatMapFunction}
import org.apache.flink.api.common.state.{AggregatingState, AggregatingStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collectorobject AverageTimestampExample {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val stream: DataStream[Event] = env.addSource(new ClickSource).assignAscendingTimestamps(_.timestamp)stream.keyBy(_.url).flatMap(new AvgTimestamp).print("input")stream.print("input")env.execute()}class AvgTimestamp extends RichFlatMapFunction[Event, String] {// 定义一个聚合状态lazy val avgTsAggState: AggregatingState[Event, Long] = getRuntimeContext.getAggregatingState(new AggregatingStateDescriptor[Event, (Long, Long), Long]("avg-ts",new AggregateFunction[Event, (Long, Long), Long] {override def createAccumulator(): (Long, Long) = (0L, 0L)override def add(in: Event, acc: (Long, Long)): (Long, Long) = (acc._1 + in.timestamp, acc._2 + 1)override def getResult(acc: (Long, Long)): Long = acc._1 / acc._2override def merge(acc: (Long, Long), acc1: (Long, Long)): (Long, Long) = ???},classOf[(Long,Long)]))// 定义一个值状态,保存当前到达的数据个数lazy val countState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("count", classOf[Long]))override def flatMap(in: Event, collector: Collector[String]): Unit = {avgTsAggState.add(in)// 更新count值val count: Long = countState.value()countState.update(count+1)if (countState.value()==5){collector.collect(s"${in.user}的平均时间戳为: ${avgTsAggState.get()}")countState.clear()}}}
}

在这里插入图片描述

二、Operator State(算子状态)

1.BufferingSinkExample.scala

在下面的例子中,自定义的 SinkFunction 会在CheckpointedFunction 中进行数据缓存,然后统一发送到下游。这个例子演示了列表状态的平均分割重组(event-split redistribution)。

package com.atguigu.chapter06import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala._import scala.collection.mutable.ListBufferobject BufferingSinkExample {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.addSource(new ClickSource).assignAscendingTimestamps(_.timestamp).addSink(new BufferingSink(10))env.execute()}// 实现自定义SinkFunctionclass BufferingSink(threshold: Int) extends SinkFunction[Event] with CheckpointedFunction {// 定义列表状态,保存要缓冲的数据var bufferedState: ListState[Event] = _// 定义本地变量列表val bufferedList: ListBuffer[Event] = ListBuffer[Event]()override def invoke(value: Event, context: SinkFunction.Context): Unit = {// 缓冲数据bufferedList+=value// 判断是否达到阈值if (bufferedList.size==threshold){// 输出到外部系统,打印bufferedList.foreach(data=>println(data))println("=======输出完毕==============")// 清空缓冲bufferedList.clear()}}override def snapshotState(context: FunctionSnapshotContext): Unit = {// 清空状态bufferedState.clear()for (data <- bufferedList){bufferedState.add(data)}}override def initializeState(context: FunctionInitializationContext): Unit = {bufferedState = context.getOperatorStateStore.getListState(new ListStateDescriptor[Event]("buffered-list", classOf[Event]))// 判断如果是从故障中恢复,那么就将状态中的数据添加到局部变量中if (context.isRestored) {import scala.collection.convert.ImplicitConversions._for (data <- bufferedState.get()) {bufferedList += data}}}}
}

在这里插入图片描述

三、Broadcast State(广播状态)

1.BroadcastStateExample.scala

行为匹配案例

package com.atguigu.chapter06import org.apache.flink.api.common.state.{BroadcastState, MapStateDescriptor, ReadOnlyBroadcastState, ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector// 声明样例类
case class Action(userid: String, action: String)case class Pattern(action1: String, action2: String)object BroadcastStateExample {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)// 定义数据流val actionStream: DataStream[Action] = env.fromElements(Action("Alice", "login"),Action("Alice", "pay"),Action("Bob", "login"),Action("Bob", "buy"))// 定义模式流,读取指定的行为模式val patternStream: DataStream[Pattern] = env.fromElements(Pattern("login", "pay"),Pattern("login", "buy"))// 定义广播状态的描述器val patterns = new MapStateDescriptor[Unit, Pattern]("patterns", classOf[Unit], classOf[Pattern])val broadcastStream: BroadcastStream[Pattern] = patternStream.broadcast(patterns)// 连接两条流,进行处理actionStream.keyBy(_.userid).connect(broadcastStream).process(new PatternEvaluation).print()env.execute()}// 实现自定义的KeyedbroadcastProcessFunctionclass PatternEvaluation extends KeyedBroadcastProcessFunction[String,Action,Pattern,(String,Pattern)]{// 定义值状态,保存上一次用户行为lazy val prevActionState: ValueState[String] = getRuntimeContext.getState(new ValueStateDescriptor[String]("prev-action", classOf[String]))override def processElement(value: Action, ctx: KeyedBroadcastProcessFunction[String, Action, Pattern, (String, Pattern)]#ReadOnlyContext, out: Collector[(String, Pattern)]): Unit = {// 从广播状态中获取行为数据val pattern= ctx.getBroadcastState(new MapStateDescriptor[Unit, Pattern]("patterns", classOf[Unit], classOf[Pattern])).get(Unit)// 从值状态中获取上次的行为val prevAction: String = prevActionState.value()if (pattern != null && prevAction != null){if (pattern.action1==prevAction && pattern.action2==value.action){out.collect((ctx.getCurrentKey,pattern))}}// 保存状态prevActionState.update(value.action)}override def processBroadcastElement(value: Pattern, ctx: KeyedBroadcastProcessFunction[String, Action, Pattern, (String, Pattern)]#Context, out: Collector[(String, Pattern)]): Unit = {val bcState: BroadcastState[Unit, Pattern] = ctx.getBroadcastState(new MapStateDescriptor[Unit, Pattern]("patterns", classOf[Unit], classOf[Pattern]))bcState.put(Unit,value)}}
}

在这里插入图片描述


总结

这次记录就到这里.


http://www.ppmy.cn/news/88415.html

相关文章

TI EDI 项目数据库方案开源介绍

TI EDI 工作流简介 TI EDI到SQL Server示例流具有预配置的端口&#xff0c;用于从TI的EDI集成规范转换以下交易集&#xff1a; 850 采购订单,企业 -> TI855 采购订单确认,TI -> 企业860 采购订单变更,企业 -> TI865 采购订单变更确认,TI -> 企业856 发货通知,TI …

轻松安装Redis:不用担心配置问题

一、Centos安装Redis 1.安装 EPEL 源 Redis 不在 CentOS 官方仓库中&#xff0c;需要安装 EPEL 源才能访问到 Redis 软件包。运行以下命令安装 EPEL 源&#xff1a; sudo yum install epel-release 2.安装 Redis 使用以下命令安装 Redis&#xff1a; sudo yum install re…

cython编译加密python源码

场景 python的解释特性是将py编译为独有的二进制编码pyc 文件&#xff0c;然后对pyc中的指令进行解释执行&#xff0c;但是 pyc的反编译却非常简单&#xff0c;可直接反编译为源码&#xff0c;当需要将产品发布到外部环境的时候&#xff0c;源码的保护尤为重要。 一、Cpython介…

【强制缓存和协商缓存的区别】

超文本传输协议&#xff08;HyperText Transfer Protocol&#xff0c;HTTP&#xff09;是目前使用最广泛的应用层协议。在网站、App、开放接口中都可以看到它。HTTP 协议设计非常简单&#xff0c;但是涵盖的内容很多。 WWW 1990 年蒂姆伯纳斯李开发了第一个浏览器&#xff0c…

Flutter控件之CircularProgressIndicator

CircularProgressIndicator的作用 Flutter中的CircularProgressIndicator是一个圆形进度指示器&#xff0c;用于表示正在进行的任务的进度。它通常用于长时间运行的任务&#xff0c;例如文件下载、网络请求等。CircularProgressIndicator可以在圆周上旋转&#xff0c;以表示正…

大数据开发之Hive案例篇8-解析XML

文章目录 一. 问题描述二. 解决方案2.1 官方文档2.2 XML格式不规范 一. 问题描述 今天接到一个新需求&#xff0c;hive表里面有个字段存储的是XML类型数据 数据格式: <a><b>bb</b><c>cc</c> </a>二. 解决方案 2.1 官方文档 遇到不懂的…

数据库概述

文章目录 基本概念一、数据1.定义2.分类 二、数据库1.定义2.三个基本特点&#xff1a; 三、数据库管理系统(DBMS)1.定义2.功能3.特色 四、数据管理技术的产生和发展1.人工管理阶段&#xff1a;2.文件系统阶段3.数据库系统阶段 五、数据库系统的特点1.数据结构化2.数据的共享性高…

什么是DRTP?在证券行业的应用?

一、概念 DRTP是指分布式实时处理&#xff08;Distributed Real-Time Processing&#xff09;&#xff0c;是一种处理实时数据的技术。它通过将数据分散到多个节点上进行处理&#xff0c;从而实现高效的实时数据处理。DRTP通常用于处理大规模的数据流&#xff0c;例如网络流量…