大数据-114 Flink DataStreamAPI 程序输入源 自定义输入源 Rich并行源 RichParallelSourceFunction

server/2024/11/13 9:12:32/

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节完成了如下的内容:

  • FlinkDataStreamAPI 自定义输入源
  • 非并行源介绍与代码
  • 并行源介绍与代码

在这里插入图片描述

Rich并行源

基本介绍

在 Apache Flink 中,RichSourceFunction 是一种增强的源函数(Source Function),它允许开发者在定义源操作时,能够访问 Flink 的生命周期方法、状态管理、配置访问等更多功能。RichSourceFunction 是并行源的一个扩展,它继承自 RichFunction 接口,而 RichFunction 提供了更丰富的功能,比如访问运行时上下文、管理状态、以及在作业开始和结束时执行初始化或清理操作。

主要特点

  • 生命周期方法:RichSourceFunction 提供了 open() 和 close() 方法,分别在作业开始时和结束时调用。这允许你在数据读取前进行初始化操作(如打开连接、加载配置),以及在作业结束时进行清理工作(如关闭连接、释放资源)。
  • 访问运行时上下文:通过 getRuntimeContext() 方法,RichSourceFunction 可以访问 Flink 的运行时上下文,获取并行度信息、任务名称、指标管理器,以及与状态相关的操作。
  • 状态管理:RichSourceFunction 可以结合 Flink 的状态管理机制,保存和恢复状态。这对于需要在流处理中维护中间状态的源函数非常有用,尤其是在故障恢复时,状态可以帮助恢复到故障前的状态。
  • 并行执行:与普通的 SourceFunction 类似,RichSourceFunction 也可以通过设置并行度来并行执行,这使得它可以处理大规模的数据
    源。

状态管理

RichFunction 与 Flink 的状态管理系统高度集成,允许你在分布式环境中维护和管理操作符的中间状态。Flink 支持两种主要类型的状态:ValueState 和 ListState,以及更复杂的 MapState 和 ReducingState。

  • ValueState: 适用于需要保存单个值的场景,如计数器、标志位等。
  • ListState: 适用于需要保存多个值的场景,如窗口计算中的中间结果。
  • MapState: 适用于需要维护键值对的场景,特别是在进行复杂的数据关联或聚合时。
  • ReducingState: 适用于需要持续聚合数据的场景,比如计数、求和等。

示例代码

以下是一个使用 RichParallelSourceFunction 的简单示例,展示了如何在 Flink 中实现一个并行的、具有生命周期管理的源函数:

java">import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;public class MyRichParallelSource extends RichParallelSourceFunction<String> {private volatile boolean isRunning = true;@Overridepublic void open(Configuration parameters) throws Exception {// 在任务开始时执行初始化操作System.out.println("Task " + getRuntimeContext().getTaskName() + " is starting.");}@Overridepublic void run(SourceContext<String> ctx) throws Exception {// 模拟数据流的产生while (isRunning) {synchronized (ctx.getCheckpointLock()) {ctx.collect("Data from task " + getRuntimeContext().getIndexOfThisSubtask());}Thread.sleep(1000);}}@Overridepublic void cancel() {isRunning = false;}@Overridepublic void close() throws Exception {// 在任务结束时执行清理操作System.out.println("Task " + getRuntimeContext().getTaskName() + " is closing.");}
}

代码解析

  • open() 方法:在任务开始时调用,适用于进行连接初始化、参数设置等操作。在这个方法中,你可以访问 Flink 的配置和运行时上下文。
  • run() 方法:实现数据源的核心逻辑,这个方法会在源函数启动后被调用。可以使用 ctx.collect() 方法将生成的数据发送到下游处理。
  • cancel() 方法:用于取消任务。当作业被取消或停止时,Flink 会调用这个方法,可以在这里做一些清理工作或者安全地停止数据生成。
  • close() 方法:在任务结束时调用,用于释放资源和进行清理操作。

注意事项

  • 状态一致性:在并行源中,如果需要维护状态,一定要注意状态的一致性和恢复机制,确保在作业恢复时可以正确地恢复数据源的状态。
  • 并行度设置:RichParallelSourceFunction 作为并行源,可以通过 setParallelism 方法设置并行度,确保根据任务的需求合理分配并行实例的数量。

RichParallelSource

java">package icu.wzk;import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;public class RichParallelSourceRich extends RichParallelSourceFunction<String> {private long count = 1L;private boolean running = true;@Overridepublic void run(SourceContext<String> ctx) throws Exception {while (running) {count ++;ctx.collect(String.valueOf(count));Thread.sleep(1000);}}@Overridepublic void cancel() {running = false;}
}

RichParallelSourceTest

java">package icu.wzk;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;public class RichParallelSourceRichTest {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> data = env.getJavaEnv().addSource(new RichParallelSourceRich());data.print();env.execute("RichParallelSourceRichTest");}}

运行结果

3> 10
5> 10
8> 10
6> 10
2> 10
4> 10
7> 10
1> 10
6> 11
5> 11
8> 11
2> 11
3> 11
4> 11
7> 11
1> 11
2> 12
3> 12
...

控制台输出结果如下所示:
在这里插入图片描述

为什么 Rich 类使用广泛

  • 生命周期管理:Rich 类提供了 open() 和 close() 方法,允许开发者在任务开始和结束时执行初始化和清理操作。这对于需要设置资源(如数据库连接、文件读写、外部服务连接)的操作非常有用。
  • 运行时上下文访问:通过 getRuntimeContext(),Rich 类可以访问任务的并行度信息、任务名称、子任务索引、状态管理等。对于需要根据任务上下文调整行为或需要跨并行实例共享状态的场景,这些信息是至关重要的。
  • 状态管理:RichFunction 可以方便地与 Flink 的状态管理结合使用。在状态丰富的应用场景(如需要维护中间计算结果、计数器、缓存等)的流处理中,Rich 类显得非常有用。
  • 性能监控:Rich 类允许开发者在 open() 方法中注册 Flink 的度量指标(Metrics),帮助监控和优化作业的性能。

什么时候不用 Rich 类

  • 简单操作:如果你只是需要进行简单的转换或过滤操作,没有复杂的初始化、状态管理或清理需求,那么 Rich 类的额外功能可能并不必要。
  • 高性能要求的场景:在一些对性能要求极高的场景中,尽量减少复杂的操作和额外的上下文访问,直接使用轻量级的 MapFunction、FilterFunction 等可能会有更好的性能表现。

http://www.ppmy.cn/server/113220.html

相关文章

05:【江科大stm32】:定时器输出比较功能(PWM)

定时器输出比较功能&#xff08;PWM&#xff09; 1、LED呼吸灯2、PWM驱动舵机3、PWM驱动电机 定时器输出比较功能标准库编程的编程接口&#xff1a; 1、LED呼吸灯 ①PWM.c文件的代码如下&#xff1a; #include "stm32f10x.h" // Device header/*使…

读取obj文件中的vt并转成需要的uv数据

先展示效果图 适用场景&#xff1a;加载的obj模型需要发生形变&#xff0c;同时还要展示模型的纹理效果&#xff0c;可以使用到面料模拟或者弹性物体的模拟当中 具体实现方案&#xff1a; 1、读取obj文件中的vt的值&#xff0c;存起来 2、读取f值&#xff0c;存v索引和vt索引 3…

63、Python之函数高级:装饰器缓存实战,优化递归函数的性能

引言 通过前面的文章&#xff0c;我们已经掌握了Python中常用的装饰器的使用技巧&#xff0c;这篇文章中&#xff0c;我们通过一个装饰器的实战案例&#xff0c;来进一步加深对装饰器的适用场景的理解。 本文的主要内容有&#xff1a; 1、递归函数 2、递归实现斐波那契数列…

滚雪球学MyBatis-Plus(04):基础配置

前言 在上期内容中&#xff0c;我们详细介绍了如何进行项目初始化&#xff0c;包括添加 MyBatis Plus 依赖、配置数据库连接&#xff0c;以及创建基础的实体类和 Mapper 接口。这些步骤为我们搭建了一个基本的开发框架&#xff0c;使我们能够快速上手 MyBatis Plus 的开发工作…

维度不固定的多维数组形参笔记

在利用多维数组作为函数形参时遇到了点问题&#xff0c;如&#xff1a; void fun(char str[][10]); 这个函数可以传入多维数组&#xff0c;但元素个数必须是固定的&#xff0c;假如传入一个str[][20]&#xff0c;元素个数不一样的数组&#xff0c;那么这个函数就不适用了&…

7-8月月报 | Apache SeaTunnel社区进展一览

各位热爱 Apache SeaTunnel 的小伙伴们&#xff0c;社区 7-8 月份月报来啦&#xff01;这两个月项目有了哪些进展&#xff1f;又有谁登上了我们社区的贡献者榜单呢&#xff1f;快来一睹为快吧。 Merge Stars 感谢以下小伙伴上两个月为 Apache SeaTunnel 项目和社区发展所做的…

AWS EC2安全组配置:轻松开放端口访问

在AWS EC2实例上开放特定端口是配置服务器安全性和可访问性的重要步骤。本文中九河云将介绍如何通过AWS控制台配置EC2安全组来实现端口开放。 1. 登录AWS控制台 首先,登录到AWS管理控制台,并导航到EC2服务页面。 2. 找到目标EC2实例 在EC2控制面板中,找到需要开放端口的实例…

江协科技stm32————11-3 软件读写W25Q64

目录 MySPI.c W24Q64.c W25Q64_Ins.h main.c MySPI.c 包含通信引脚封装、初始化以及SPI通信的3个拼图&#xff08;起始、终止和交换一个字节&#xff09; #include "stm32f10x.h" // Device headervoid MySPI_W_SS(uint8_t BitValue) //ss cs…