Flink第五章:处理函数

news/2024/10/22 12:42:35/

系列文章目录

Flink第一章:环境搭建
Flink第二章:基本操作.
Flink第三章:基本操作(二)
Flink第四章:水位线和窗口
Flink第五章:处理函数


文章目录

  • 系列文章目录
  • 前言
  • 一、基本处理函数(ProcessFunction)
  • 二、按键分区处理函数(KeyedProcessFunction)
    • 1.处理时间定时服务
    • 2.事件时间定时服务
  • 三、TopN案例
    • 1.ProcessAllWindowFunction
    • 2.KeyedProcessFunction
  • 总结


前言

处理函数
简单来时就是比DataStream API更加底层的函数,能够处理更加复杂的问题
创建scala文件
在这里插入图片描述


一、基本处理函数(ProcessFunction)

我们用它来实现一个简单的Map操作,如果点击用户是Marry就输出用户名,是Alice就输出用户名+url

ProcessFunction.scala

package com.atguigu.chapter04import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collectorobject ProcessFunction {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val stream: DataStream[Event] = env.addSource(new ClickSource).assignAscendingTimestamps(_.timestamp)stream.process(new ProcessFunction[Event,String] {override def processElement(value: Event, ctx: ProcessFunction[Event, String]#Context, out: Collector[String]): Unit = {if (value.user.equals("Mary"))out.collect(value.user)else if (value.user.equals("Alice")){out.collect(value.user+value.url)}}}).print()env.execute()}
}

在这里插入图片描述

二、按键分区处理函数(KeyedProcessFunction)

在 Flink 程序中,为了实现数据的聚合统计,或者开窗计算之类的功能,我们一般都要先用 keyBy()算子对数据流进行“按键分区”,得到一个 KeyedStream。而只有在 KeyedStream 中,才支持使用 TimerService 设置定时器的操作。所以一般情况下,我们都是先做了 keyBy()分区之后,再去定义处理操作;代码中更加常见的处理函数是 KeyedProcessFunction。

1.处理时间定时服务

主要是在数据到达一段时间后进行数据操作

ProcessingTimeTimerTest.scala

package com.atguigu.chapter04import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collectorobject ProcessingTimeTimerTest {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val stream: DataStream[Event] = env.addSource(new ClickSource).assignAscendingTimestamps(_.timestamp)stream.keyBy(data=>true).process(new KeyedProcessFunction[Boolean,Event,String] {override def processElement(value: Event, ctx: KeyedProcessFunction[Boolean, Event, String]#Context, out: Collector[String]): Unit = {val currenTime: Long = ctx.timerService().currentProcessingTime()out.collect("数据到达,当前时间是:"+currenTime)//注册一个5秒之后的定时器ctx.timerService().registerProcessingTimeTimer(currenTime+5*1000)}//执行逻辑override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Boolean, Event, String]#OnTimerContext, out: Collector[String]): Unit = {out.collect("定时器触发,触发时间为:"+timestamp)}}).print()env.execute()}
}

在这里插入图片描述

2.事件时间定时服务

在数据产生一段时间后进行处理
EventTimeTimeTest.scala

package com.atguigu.chapter04import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collectorobject EventTimeTimeTest {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val stream: DataStream[Event] = env.addSource(new CustomSource).assignAscendingTimestamps(_.timestamp)stream.keyBy(data=>true).process(new KeyedProcessFunction[Boolean,Event,String] {override def processElement(value: Event, ctx: KeyedProcessFunction[Boolean, Event, String]#Context, out: Collector[String]): Unit = {val currenTime: Long = ctx.timerService().currentWatermark()out.collect(s"数据到达,当前时间是: $currenTime,当前数据时间戳是:${value.timestamp}")//注册一个5秒之后的定时器ctx.timerService().registerEventTimeTimer(ctx.timestamp()+5*1000)}//执行逻辑override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Boolean, Event, String]#OnTimerContext, out: Collector[String]): Unit = {out.collect("定时器触发,触发时间为:"+timestamp)}}).print()env.execute()}class CustomSource extends SourceFunction[Event]{override def run(ctx: SourceFunction.SourceContext[Event]): Unit = {ctx.collect(Event("Mary","./home",1000L))Thread.sleep(5000)ctx.collect(Event("Mary","./home",2000L))Thread.sleep(5000)ctx.collect(Event("Mary","./home",6000L))Thread.sleep(5000)ctx.collect(Event("Mary","./home",6001L))Thread.sleep(5000)}override def cancel(): Unit = ???}
}

在这里插入图片描述

三、TopN案例

1.ProcessAllWindowFunction

TopNProcessAllWindowExample.scala

package com.atguigu.chapter04import com.atguigu.chapter02.Source.{ClickSource, Event}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collectorimport scala.collection.mutableobject TopNProcessAllWindowExample {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val stream: DataStream[Event] = env.addSource(new ClickSource).assignAscendingTimestamps(_.timestamp)// 直接开窗统计stream.map(_.url).windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).process(new ProcessAllWindowFunction[String, String, TimeWindow] {override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {// 1.统计每个url的访问次数// 初始化Map (url,count)val urlCountMap: mutable.Map[String, Long] = mutable.Map[String, Long]()elements.foreach(data => urlCountMap.get(data) match {case Some(count) => urlCountMap.put(data, count + 1)case None => urlCountMap.put(data, 1)})//2.对数据进行排序提取val urlCountList: List[(String, Long)] = urlCountMap.toList.sortBy(-_._2).take(2)//3.包装信息打印输出val result = new mutable.StringBuilder()result.append(s"=========窗口: ${context.window.getStart} - ${context.window.getEnd}=======\n")for (i <- urlCountList.indices){val tuple: (String, Long) = urlCountList(i)result.append(s"浏览量TopN ${i+1}").append(s"url: ${tuple._1} ").append(s"浏览量是: ${tuple._2} \n")}out.collect(result.toString())}}).print()env.execute()}
}

在这里插入图片描述

2.KeyedProcessFunction

TopNkeyProcessFunctionExample.scala

package com.atguigu.chapter04import com.atguigu.chapter02.Source.{ClickSource, Event}
import com.atguigu.chapter03.UrlViewCount
import com.atguigu.chapter03.UrlViewCountExample.{UrlViewCountAgg, UrlViewCountResult}
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collectorimport scala.collection.convert.ImplicitConversions.`iterable AsScalaIterable`
import scala.collection.mutableobject TopNkeyProcessFunctionExample {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val stream: DataStream[Event] = env.addSource(new ClickSource).assignAscendingTimestamps(_.timestamp)// 1.结合使用增量聚合函数和全窗口函数,统计每个url的访问频次val urlCountStream: DataStream[UrlViewCount] = stream.keyBy(_.url).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate(new UrlViewCountAgg, new UrlViewCountResult)// 2.按照窗口信息进行分组提取,排序输出val resultStream: DataStream[String] = urlCountStream.keyBy(_.windowEnd).process(new TopN(2))resultStream.print()env.execute()}class TopN(n: Int) extends KeyedProcessFunction[Long, UrlViewCount, String] {// 声明列表状态var urlViewCountListState: ListState[UrlViewCount] = _override def open(parameters: Configuration): Unit = {urlViewCountListState = getRuntimeContext.getListState(new ListStateDescriptor[UrlViewCount]("list-state", classOf[UrlViewCount]))}override def processElement(value: UrlViewCount, ctx: KeyedProcessFunction[Long, UrlViewCount, String]#Context, out: Collector[String]): Unit = {//每来一个数据,就直接放入ListState中urlViewCountListState.add(value)//注册一个窗口结束时间1ms之后的定时器ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)}override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, UrlViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {// 先把数据提取出来放到List里val urlViewCountList: List[UrlViewCount] = urlViewCountListState.get().toListval topnList: List[UrlViewCount] = urlViewCountList.sortBy(-_.count).take(n)//结果包装输出val result = new mutable.StringBuilder()result.append(s"=========窗口: ${timestamp - 1 - 10000} - ${timestamp - 1}=======\n")for (i <- topnList.indices) {val urlViewCount = topnList(i)result.append(s"浏览量Top ${i + 1} ").append(s"url: ${urlViewCount.url} ").append(s"浏览量是: ${urlViewCount.count} \n")}out.collect(result.toString())}}
}

在这里插入图片描述


总结

有关Flink底层处理函数的Api就到这里.


http://www.ppmy.cn/news/75983.html

相关文章

一、尚医通登录需求

文章目录 一、登录需求1、登录效果2、登录需求 二、登录1&#xff0c;搭建service-user模块1.1 搭建service-user模块1.2 修改配置1.3 启动类1.4 配置网关 2、添加用户基础类2.1 添加model2.2 添加Mapper2.3 添加service接口及实现类2.4 添加controller 3、登录api接口3.1 添加…

linux系统升级/更新OpenSSL版本操作流程记录

问题描述&#xff1a;有时 OpenSSL 版本过老升级&#xff0c;或者需要更新 OpenSSL 版本 1. 登录 linux 系统后输入 openssl version 查看现在使用的版本 我的输入后版本信息为&#xff1a;OpenSSL 1.1.1g FIPS 21 Apr 2020 &#xff0c;可以看到是一年前更新版本&#xff0c;…

深入浅出 SQL Server CDC 数据同步

简介 SQL Server 是一款老牌关系型数据库,自 1988 年由 Microsoft、Sybase 和 Ashton-Tate 三家公司共同推出&#xff0c;不断迭代更新至今&#xff0c;拥有相当广泛的用户群体。 如今&#xff0c;我们提到 SQL Server 通常指 Microsoft SQL Server 2000 之后的版本。 SQL S…

网络安全里主要的岗位有哪些?小白如何快速入门学习黑客?

入门Web安全、安卓安全、二进制安全、工控安全还是智能硬件安全等等&#xff0c;每个不同的领域要掌握的技能也不同。 当然入门Web安全相对难度较低&#xff0c;也是很多人的首选。主要还是看自己的兴趣方向吧。 本文就以下几个问题来说明网络安全大致学习过程&#x1f447; 网…

什么是jquery jq的基本使用

JQuery的概述 jQuery是一个快速的&#xff0c;简洁的javaScript库&#xff0c;使用户能更方便地处理HTML documents、events、实现动画效果&#xff0c;并且方便地为网站提供AJAX交互。 jQuery能够使用户的html页保持代码和html内容分离&#xff0c;也就是说&#xff0c…

Java高并发核心编程—CAS与JUC原子类

注&#xff1a;本笔记是阅读《Java高并发核心编程卷2》整理的笔记&#xff01; CAS原理 JUC原子类一Atomic 基本原子类 数组原子类 引用原子类 字段更新原子类 AtomicInteger 线程安全原理 引用类型原子类 属性更新原子类 ABA问题 提升高并发场景下CAS提作的性能 以空间换时间:…

【饿了么UI】elementUI密码框图标实现睁眼和闭眼效果(阿里巴巴iconfront图标库vue项目本地引用)

elementUI中输入框的密码框属性&#xff0c; 默认是一个始终睁眼的图标&#xff0c;测试今天提bug要有闭眼效果&#xff08;无大语&#xff09;… 因为elementUI中的icon没有闭眼的&#xff0c;所以还要去iconfront下载引入 效果图&#xff1a; 点击后 一、下载图标 http…

Spring Security 中的过滤器链是什么?它的作用是什么

Spring Security是一个安全框架&#xff0c;它提供了强大的安全保护功能&#xff0c;可以帮助开发者更加方便地实现应用程序的安全性。Spring Security中的过滤器链是其中一个非常重要的部分&#xff0c;它起到了非常重要的作用。本文将介绍什么是Spring Security中的过滤器链&…