Flink 实现无界流

ops/2024/11/15 4:43:06/

Flink 实现无界流

package org.example.test;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** DataSet API使用*/
public class WordCount2 {public static void main(String[] args) throws Exception {//该类主要是用于进行批处理final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//读取文本DataSource<String> stringDataStreamSource = env.readTextFile("input/test.txt");//进行ETL处理,Tuple2 是二元数组的意思FlatMapOperator<String, Tuple2<String, Integer>> stringTuple2FlatMapOperator = stringDataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split(" ");for (String word : words) {Tuple2<String, Integer> oneTuple2 = Tuple2.of(word, 1);out.collect(oneTuple2);}}});//进行分组,分组字段取下标第0个UnsortedGrouping<Tuple2<String, Integer>> tuple2UnsortedGrouping =stringTuple2FlatMapOperator.groupBy(0);//进行sum操作AggregateOperator<Tuple2<String, Integer>> sum = tuple2UnsortedGrouping.sum(1);sum.print();}
}

ExecutionEnvironment 是批处理的方式,DataSource会慢慢被淘汰


http://www.ppmy.cn/ops/114571.html

相关文章

【数据结构-差分】力扣1589. 所有排列中的最大和

有一个整数数组 nums &#xff0c;和一个查询数组 requests &#xff0c;其中 requests[i] [starti, endi] 。第 i 个查询求 nums[starti] nums[starti 1] … nums[endi - 1] nums[endi] 的结果 &#xff0c;starti 和 endi 数组索引都是 从 0 开始 的。 你可以任意排列…

【数据结构】经典题

所以&#xff0c;语句 x; 的语句频度为&#xff1a;n(n1)(n2&#xff09;/6 选C 临时变量 t&#xff1a;只使用了一个额外的变量来存储交换的值。 没有使用额外的数组&#xff1a;所有的操作都是在原数组 a 上进行的。 因此&#xff0c;算法的空间复杂度是常数级别的&#xff0…

使用Apache SeaTunnel高效集成和管理SftpFile数据源

本文为Apache SeaTunnel已经支持的SftpFile Source Connector使用文档&#xff0c;旨在帮助读者理解如何高效地使用SFTP文件源连接器&#xff0c;以便轻松地使用Apache SeaTunnel集成和管理您的SftpFil数据源。 SftpFile 是指通过 SFTP&#xff08;Secure File Transfer Proto…

LeetCode题练习与总结:回文链表--234

一、题目描述 给你一个单链表的头节点 head &#xff0c;请你判断该链表是否为回文链表。如果是&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 示例 1&#xff1a; 输入&#xff1a;head [1,2,2,1] 输出&#xff1a;true示例 2&#xff1a; 输入&#x…

python函数的一些介绍

函数的多返回值 def 函数(): return 1,2,3 x,y,z 函数&#xff08;&#xff09;#对应1&#xff0c;2&#xff0c;3 有几个就要有对应的几个变量存储&#xff0c;不然会报错 函数的关键字参数 def 函数&#xff08;name,id&#xff09;&#xff1a; 打印输出name和id 函数…

CSAPP Bomb Lab

本 Lab 可以说是 CSAPP 的几个 Lab 中最为人津津乐道的一个&#xff0c;对应知识点为书中的第 3 章&#xff08;程序的机器级表示&#xff09;&#xff0c;要求使用 GDB 调试器&#xff0c;对汇编语言进行调试&#xff0c;从而得出正确的“拆弹密码”。共分为 6 个关卡和一个隐…

Android 命令行关机

在 Android 设备上&#xff0c;可以通过以下命令行命令来关机&#xff1a; adb shell reboot -p其中&#xff1a; adb shell&#xff1a;通过 ADB 进入设备的命令行环境。reboot -p&#xff1a;执行关机操作&#xff0c;-p 表示关机而不是重启。 如果你是在设备本地的终端上而…

Linux(Centos7)系统下给已有分区进行扩容

本文详细介绍了&#xff0c;如何给Centos中已有分区进行扩容&#xff0c;简单的几条命令即可完成。 文章目录 1. 创建物理卷 (PV)2. 将新的物理卷添加到卷组 (VG)3. 扩展逻辑卷 (LV)4. 扩展文件系统4.1 查看文件系统类型4.2 扩展文件系统 完成 1、首先把vmware中的linux关机&am…