Flink学习(七)-单词统计

ops/2024/10/22 14:26:39/

前言

Flink是流批一体的框架。因此既可以处理以流的方式处理,也可以按批次处理。

一、代码基础格式

//1st 设置执行环境
xxxEnvironment env = xxxEnvironment.getEnvironment;//2nd 设置流
DataSource xxxDS=env.xxxx();//3rd 设置转换
Xxx transformation =xxxDS.xxxx();//4th 设置sink
transformation.print();//5th 可能需要
env.execute();

二、Demo1 批处理

  • 源码

 public static void main(String[] args) throws Exception {//1,创建一个执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//2,获取输入流DataSource<String> lineDS = env.readTextFile("input/word.txt");//3,处理数据FlatMapOperator<String, Tuple2<String, Integer>> wordDS = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {//3.1 分隔字符串String[] values = value.split(" ");//3.2 汇总统计for (String word : values) {Tuple2<String, Integer> wordTuple = Tuple2.of(word, 1);collector.collect(wordTuple);}}});//4,按单词聚合UnsortedGrouping<Tuple2<String, Integer>> tuple2UnsortedGrouping = wordDS.groupBy(0);//5,分组内聚合AggregateOperator<Tuple2<String, Integer>> sum = tuple2UnsortedGrouping.sum(1);//6,输出结果sum.print();}
  • 效果展示

三、Demo2 流处理

  • 源码

   public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> lineDS = env.readTextFile("input/word.txt");SingleOutputStreamOperator<Tuple2<String, Integer>> wordDS = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] words = value.split(" ");for (String word : words) {Tuple2<String, Integer> temp = Tuple2.of(word, 1);collector.collect(temp);}}});KeyedStream<Tuple2<String, Integer>, Tuple> wordCountKeyBy = wordDS.keyBy(0);SingleOutputStreamOperator<Tuple2<String, Integer>> sum = wordCountKeyBy.sum(1);sum.print();env.execute();}
  • 效果展示

四、Demo3 无边界流处理

  • 源码

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> lineDS = env.socketTextStream("192.168.3.11", 9999);SingleOutputStreamOperator<Tuple2<String, Integer>> sum = lineDS.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(value -> value.f0).sum(1);sum.print();env.execute();}
  • 效果展示 

往192.168.3.11的9999端口上持续输送数据流,程序端会出现如下统计


http://www.ppmy.cn/ops/7431.html

相关文章

JDK8开始新增的日期和时间

1、为什么要用JDK8开始新增的时间表&#xff1a; 传统的时间类&#xff08;Date&#xff0c;simpleDateFormat&#xff0c;Calender&#xff09;存在如下问题&#xff1a; ①设计不合理&#xff0c;使用不方便&#xff0c;很多都被淘汰了 ②都是可变对象&#xff0c;修改后会…

解决程序化刷新EXCEL提示更新外部链接的弹窗问题

解决方法 【信任中心】-> 【消息栏】->勾选如下策略提示 2. 【信任中心】->【外部内容】->启用下面的三项链接 3. 【信任中心】->【宏设置】->启用所有宏

C++ 随机数

在许多情况下&#xff0c;需要生成随机数。关于随机数生成器&#xff0c;有两个相关的函数。一个是 rand()&#xff0c;该函数只返回一个伪随机数。生成随机数之前必须先调用 srand() 函数。 下面是一个关于生成随机数的简单实例。实例中使用了 time() 函数来获取系统时间的秒…

web安全学习笔记(10)

记一下第十四节课的内容。 一、MySQL学习 数据库基本结构&#xff1a;库——表——列——值 在本地打开navicat&#xff0c;连接数据库&#xff0c;新建一个liuyan库、liuyan库下新建一个member表&#xff1a; 在表里随意添加一些数据&#xff1a; 下面我们学习MySQL查询。新…

MATLAB求和函数

语法 S sum(A) S sum(A,“all”) S sum(A,dim) S sum(A,vecdim) S sum(,outtype) S sum(,nanflag) 说明 示例 S sum(A) 返回沿大小大于 1 的第一个数组维度计算的元素之和。 如果 A 是向量&#xff0c;则 sum(A) 返回元素之和。 如果 A 是矩阵&#xff0c;则 sum(A) 将…

今日刷三题(day4):简写单词+dd爱框框+除2!

题目一&#xff1a;简写单词 题目描述&#xff1a; 比如 “College English Test”可以简写成“CET”&#xff0c;“Computer Science”可以简写为“CS”&#xff0c;“I am Bob”简写为“IAB” 输入输出描述&#xff1a; 输入&#xff1a;一个复合单词 输出&#xff1a;输…

Compose 布局

文章目录 Compose 布局ColumnColumn属性使用 RowRow属性使用 BoxBox属性使用 ConstraintLayoutLazyColumnLazyColumn属性使用使用多类型使用粘性标题回到顶部 LazyRowLazyRow属性使用 LazyVerticalGridLazyVerticalGrid属性使用 Compose 布局 Column Compose中的”垂直线性布…

C++语言·内存管理

本节内容比较简单&#xff0c;重点就是new和delete关键字&#xff0c;写的就比较粗糙了 1. C/C内存分布 栈上的变量是即用即销毁&#xff0c;堆上的变量按需申请释放&#xff0c;静态和全局的生命周期是整个程序 没编译运行起来的代码叫“程序”&#xff0c;编译运行起来的程序…