Flink一些常用API的使用(Flink中的Source以及Flink中的一些常用算子)

news/2024/12/15 6:11:49/

文章目录

  • 一、Source
    • 1、预定义Source
    • 2、自定义Source【重要】
    • 3、Kafka Source【重要】
  • 二、Transformation-转换算子
    • 1、union和connect-合并和连接
    • 2、Side Outputs(侧道输出)--分流


一、Source

1、预定义Source

基于本地集合的source(Collection-based-source)【测试】

  • 1.env.fromElements(可变参数);
  • 2.env.fromColletion(各种集合);
  • 3.env.fromSequence(开始,结束);

基于文件的source(File-based-source)

env.readTextFile(文件系统路径,包括hdfs路径);

基于网络套接字(socketTextStream)【测试】

socketTextStream(主机名, 端口) :非并行的Source

2、自定义Source【重要】

  • SourceFunction:非并行数据源(并行度只能=1) --接口
  • RichSourceFunction:多功能非并行数据源(并行度只能=1) --类
  • ParallelSourceFunction:并行数据源(并行度能够>=1) --接口
  • RichParallelSourceFunction:多功能并行数据源(并行度能够>=1) --类 【建议使用的】

下面是代码案例:

package com.bigdata.source;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import java.util.Random;
import java.util.UUID;@Data
@NoArgsConstructor
@AllArgsConstructor
class Order {private String orderId;private int uid;private int money;private long timeStamp;
}// 继承RichParallelSourceFunction,需要加泛型
class AutoCreatOrderSource extends RichParallelSourceFunction<Order> {boolean flag = true;/*** 需求: 每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)* 要求:* - 随机生成订单ID(UUID)* - 随机生成用户ID(0-2)* - 随机生成订单金额(0-100)* - 时间戳为当前系统时间*/Random random = new Random();@Overridepublic void run(SourceContext sourceContext) throws Exception {while (true) {String OrderId = UUID.randomUUID().toString();int UserId = random.nextInt(3);int money = random.nextInt(101);long ts = System.currentTimeMillis();Order order = new Order(OrderId, UserId, money, ts);sourceContext.collect(order);Thread.sleep(1000);}}@Overridepublic void cancel() {flag = false;}
}public class CustomSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 将自定义的数据源放入到env中DataStreamSource<Order> dataStreamSource = env.addSource(new AutoCreatOrderSource());dataStreamSource.print();env.execute();}
}

3、Kafka Source【重要】

代码案例:

package com.bigdata.source;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class KafkaSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.put("bootstrap.servers","node01:9092");properties.put("group.id", "g1");FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("topicA",new SimpleStringSchema(),properties);DataStreamSource<String> stringDataStreamSource = env.addSource(kafkaConsumer);stringDataStreamSource.print();env.execute("KafkaSource");}
}

二、Transformation-转换算子

有map、flatMap、filter、keyBy、reduce、union和connect(合并和连接)、Side Outputs(侧道输出--分流)

1、union和connect-合并和连接

  • union合并的DataSet的类型必须是一致的,可以取并集,但是不会去重
  • connect可以连接2个不同类型的流(最后需要处理后再输出),和union类似,但是connect只能连接两个流,两个流之间的数据类型可以不同,对两个流的数据可以分别应用不同的处理逻辑

2、Side Outputs(侧道输出)–分流

举例说明:对流中的数据按照奇数和偶数进行分流,并获取分流后的数据

package com.bigdata.transformation;import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;/**@基本功能:@program:FlinkDemo@author: hang@create:2024-11-22 11:19:46**/
public class SideOutputs {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<Long> dataStreamSource = env.fromSequence(1, 100);// 定义两个标签OutputTag<Long> even = new OutputTag<>("偶数",TypeInformation.of(Long.class));OutputTag<Long> odd = new OutputTag<>("奇数", TypeInformation.of(Long.class));//3. transformation-数据处理转换SingleOutputStreamOperator<Long> process = dataStreamSource.process(new ProcessFunction<Long, Long>() {@Overridepublic void processElement(Long value, ProcessFunction<Long, Long>.Context ctx, Collector<Long> out) throws Exception {if (value % 2 == 0) {// 打上偶数标签ctx.output(even, value);} else {// 打上奇数标签ctx.output(odd, value);}}});//4. sink-数据输出// 从数据集中获取偶数的所有数据DataStream<Long> sideOutput = process.getSideOutput(even);sideOutput.print("偶数");// 从数据集中获取奇数的所有数据DataStream<Long> sideOutput1 = process.getSideOutput(odd);sideOutput1.print("奇数");//5. execute-执行env.execute();}
}

http://www.ppmy.cn/news/1555226.html

相关文章

Vue前端开发-axios对象实例创建和配置的过程

在Vue 3中&#xff0c;由于所有的组件都可能去请求数据&#xff0c;因此&#xff0c;针对axios模块的配置应该是全局性的&#xff0c;在进行axios模块的全局配置之前&#xff0c;需要了解axios实例的创建、配置对象和响应对象的结构内容&#xff0c;接下来我们分别来进行介绍。…

opencv腐蚀和膨胀

腐蚀的核心在于把图片中白色的细微线条去除,膨胀则会将白线条扩大 # 导入OpenCV库&#xff0c;用于图像处理 import cv2 import numpy as np # 从matplotlib库中导入pyplot模块&#xff0c;用于绘制图像 from matplotlib import pyplot as plt # 创建一个名为window…

金融信息分析基础(1)

1.金融数据 金融数据分为&#xff1a;交易数据&#xff08;低频数据&#xff0c;高频数据&#xff0c;超高频数据&#xff09;&#xff0c;报表数据&#xff08;财务报表&#xff0c;研报&#xff09;&#xff0c;金融社交媒体数据 低频数据&#xff1a; 以日、周、月、季、年…

分布式中的CAP定理和BASE理论与强弱一致性

分布式中的CAP定理和BASE理论与强弱一致性 CAP定理 CAP定理&#xff0c;也称为布鲁尔定理&#xff08;Brewer’s Theorem&#xff09;&#xff0c;是由加州大学伯克利分校的Eric Brewer教授在2000年提出的&#xff0c;并由麻省理工学院的Seth Gilbert和Nancy Lynch于2002年正…

【Linux课程学习】:第二十一弹---深入理解信号(中断,信号,kill,abort,raise,larm函数)

&#x1f381;个人主页&#xff1a;我们的五年 &#x1f50d;系列专栏&#xff1a;Linux课程学习 &#x1f337;追光的人&#xff0c;终会万丈光芒 &#x1f389;欢迎大家点赞&#x1f44d;评论&#x1f4dd;收藏⭐文章 ​ Linux学习笔记&#xff1a; https://blog.csdn.…

360智脑张向征:共建可信可控AI生态 应对大模型安全挑战

发布 | 大力财经 人工智能的加速发展&#xff0c;有力推动了社会的数智化转型&#xff1b;与此同时&#xff0c;带来的相关安全风险也日益凸显。近日&#xff0c;在北京市举办的通明湖人工智能开发与应用大会上&#xff0c;360智脑总裁张向征以“大模型安全研究与实践”为主题&…

编译glibc

首先下载glibc库 glibc官网&#xff1a;https://sourceware.org/glibc/sources.html 可以通过git的方式下载glibc对应的git库 ​git clone https://sourceware.org/git/glibc.gitcd glibcgit checkout master​ 也可以通过ftp下载对应版本的glibc的源码包 地址&#xff1a;…

服务发现Discovery和Eureka自我保护

服务发现Discovery和Eureka自我保护 1.controller添加 RestController Slf4j public class PaymentController {Resourceprivate DiscoveryClient discoveryClient;GetMapping(value "/payment/discovery")public Object discovery(){List<String> services…