Flink学习(七)-单词统计

embedded/2024/9/23 21:16:53/

前言

Flink是流批一体的框架。因此既可以处理以流的方式处理,也可以按批次处理。

一、代码基础格式

//1st 设置执行环境
xxxEnvironment env = xxxEnvironment.getEnvironment;//2nd 设置流
DataSource xxxDS=env.xxxx();//3rd 设置转换
Xxx transformation =xxxDS.xxxx();//4th 设置sink
transformation.print();//5th 可能需要
env.execute();

二、Demo1 批处理

  • 源码

 public static void main(String[] args) throws Exception {//1,创建一个执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//2,获取输入流DataSource<String> lineDS = env.readTextFile("input/word.txt");//3,处理数据FlatMapOperator<String, Tuple2<String, Integer>> wordDS = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {//3.1 分隔字符串String[] values = value.split(" ");//3.2 汇总统计for (String word : values) {Tuple2<String, Integer> wordTuple = Tuple2.of(word, 1);collector.collect(wordTuple);}}});//4,按单词聚合UnsortedGrouping<Tuple2<String, Integer>> tuple2UnsortedGrouping = wordDS.groupBy(0);//5,分组内聚合AggregateOperator<Tuple2<String, Integer>> sum = tuple2UnsortedGrouping.sum(1);//6,输出结果sum.print();}
  • 效果展示

三、Demo2 流处理

  • 源码

   public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> lineDS = env.readTextFile("input/word.txt");SingleOutputStreamOperator<Tuple2<String, Integer>> wordDS = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] words = value.split(" ");for (String word : words) {Tuple2<String, Integer> temp = Tuple2.of(word, 1);collector.collect(temp);}}});KeyedStream<Tuple2<String, Integer>, Tuple> wordCountKeyBy = wordDS.keyBy(0);SingleOutputStreamOperator<Tuple2<String, Integer>> sum = wordCountKeyBy.sum(1);sum.print();env.execute();}
  • 效果展示

四、Demo3 无边界流处理

  • 源码

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> lineDS = env.socketTextStream("192.168.3.11", 9999);SingleOutputStreamOperator<Tuple2<String, Integer>> sum = lineDS.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(value -> value.f0).sum(1);sum.print();env.execute();}
  • 效果展示 

往192.168.3.11的9999端口上持续输送数据流,程序端会出现如下统计


http://www.ppmy.cn/embedded/7147.html

相关文章

MATLAB求和函数

语法 S sum(A) S sum(A,“all”) S sum(A,dim) S sum(A,vecdim) S sum(,outtype) S sum(,nanflag) 说明 示例 S sum(A) 返回沿大小大于 1 的第一个数组维度计算的元素之和。 如果 A 是向量&#xff0c;则 sum(A) 返回元素之和。 如果 A 是矩阵&#xff0c;则 sum(A) 将…

电脑工作者缓解眼部疲劳问题的工具分享

背景 作为以电脑为主要工作工具的人群&#xff0c;特别是开发人员&#xff0c;我们每天都需要长时间紧盯着屏幕&#xff0c;进行代码编写、程序调试、资料查询等工作。这种持续的工作模式无疑给我们的眼睛带来了不小的负担。一天下来&#xff0c;我们常常会感到眼睛干涩、疲劳…

转:Learn Rust the Dangerous Way-系列文章翻译-总述

原文地址 太精彩了&#xff0c;不转不足以表达我的喜爱。 前言 《Learn Rust the Dangerous Way》​cliffle.com/p/dangerust/ 最近发现了一个学习Rust的优秀系列文章&#xff0c;本人准备对该系列文章进行翻译。 本文是《Learn Rust the Dangerous Way》系列文章翻译的第…

解构 和 展开运算符

解构 {name,age}obj 1. 数组解构 数组解构是将数组的单元值快速批量赋值给一系列变量的简洁语法&#xff0c;如下代码所示&#xff1a; <script>// 普通的数组let arr [1, 2, 3];// 批量声明变量 a b c// 同时将数组单元值 1 2 3 依次赋值给变量 a b clet [a, b, c] …

【zml】vp9 vp8

目录 问题 方案 知识点 研究过程 源码编译的可能 按编译源码测试 2024-4-17 问题 所有 的机型 中&#xff0c;就海思芯片的有这个问题。应该是它的h264的编解码 问题&#xff0c;所以目前是让它以vp9在推流就没有问题。 但zlm对于vp9的录相是没有实现的。 所以目前现状…

鸿蒙原生应用元服务-访问控制(权限)开发Stage模型向用户申请授权

一、向用户申请授权 当应用需要访问用户的隐私信息或使用系统能力时&#xff0c;例如获取位置信息、访问日历、使用相机拍摄照片或录制视频等&#xff0c;应该向用户请求授权。这需要使用 user_grant 类型权限。在此之前&#xff0c;应用需要进行权限校验&#xff0c;以判断当前…

十大开源机器人 智能体

1- Poppy 网址 https://www.poppy-project.org/en/ 2- Nao 网址:https://www.aldebaran.com/en/nao 3- iCub 网址: https://icub.iit.it/

开发语言漫谈-rust

前面介绍C语言家族时忘掉了rust&#xff0c;紧急补一篇。我们称C语言家族是指他们的语法相似&#xff0c;类似这样的&#xff1a; if(){}else{}就是C家族的。C、C的传统领域就是系统底层、硬件接口方向。C/C没有垃圾内存回收机制&#xff0c;完全靠程序员的自觉天赋&#xff0…