Flink代码单词统计 ---批处理

news/2024/12/4 22:43:37/
  • flatMap:一对多转换操作,输入句子,输出分词后的每个词
  • groupBy:按Key分组,0代表选择第1列作为Key
  • sum:求和,1代表按照第2列进行累加
  • print:打印最终结果

1.WordCount代码编写

需求:统计一段文字中,每个单词出现的频次。

环境准备:在src/main/java目录下,新建一个包,命名为com.atguigu.wc。

1.1 批处理

批处理基本思路:

①.先逐行读入文件数据

②.然后将每一行文字拆分成单词

③.接着按照单词分组

④.统计每组数据的个数

⑤.就是对应单词的频次。

1.2 创建项目

1)创建工程

(1)打开IntelliJ IDEA,创建一个Maven工程。

(2)将这个Maven工程命名为Flinkdemo。

2)添加项目依赖

在项目的pom文件中,添加Flink的依赖,包括flink-java、flink-streaming-java,以及flink-clients(客户端,也可以省略)。

<properties><flink.version>1.17.0</flink.version>
</properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency>
</dependencies>

3)数据准备

(1)在工程根目录下新建一个Data文件夹,并在下面创建文本文件words.txt

(2)在words.txt中输入一些文字,例如:

hello flink
hello world
hello java

4)代码编写

(1)在com.atguigu.wc包下新建Java类Demo01_BatchProcess,在静态main方法中编写代码。具体代码实现如下:

package com.atguigu.wordcount;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;/**as* Created by Smexy on 2023/9/3**  计算的套路:*      ①计算的环境*          spark:SparkContext*          mr:   Driver*          flink:ExecutionEnvironment*      ②把要计算的数据封装为计算模型*         spark:  RDD(spark core)*                 DataFrame|DataSet(sparksql)*                 DStream(sparkstreaming)*         mr:     K-V*         flink:  DataSource**      ③调用计算api*          RDD.转换算子()*          mr: 自己去编写Mapper,Reducer*          flink: DataSource.算子()**  使用的是DataSetAPI(批处理)**  -------------------------*      了解。后续不用了!***/
public class Demo01_BatchProcess
{public static void main(String[] args) throws Exception {//创建支持flink计算的环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);//使用环境去读数据,封装为计算模型DataSource<String> source = env.readTextFile("data/words.txt");//调用计算apisource/*hello hi hi hi变为 (hello,1)(h1,1)(h1,1)(h1,1)输出到下游*/.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>(){/*String value: 输入的一行内容Collector<String> out: 输出结果的收集器。帮你把结果自动收集,输出到下游。单词,1: 输出的数据是多列,此时就应该使用集合或Bean来封装。flink提供了Tuple的集合。用于封装多个列。Tuple2: 用来封装2列Tuple3: 用来封装3列....Tuple25: 用来封装25列*/@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split(" ");for (String word : words) {//Tuple2<String, Integer> data = new Tuple2<>(word, 1);Tuple2<String, Integer> data = Tuple2.of(word, 1);//收集要输出的数据out.collect(data);}}})/*收到的是 (单词,1) 格式计算: 得到 (单词,N)groupBy(int fileds): 适用于 对Tuple类型的数据进行聚合。传入int N,N代表Tuple中的列的索引。groupBy(String fileds): 适用于对Bean类型的数据进行聚合,传入的String就是Bean中的属性名。*/.groupBy(0)// 对tuple2分组后的第二列进行sum运算.sum(1)//在控制台打印输出.print();}
}

5).输出

(flink,1)
(world,1)
(hello,3)
(java,1)

1.3  常见问题

问题1.

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

解决方式:maven项目的 pom.xml安装依赖:

        <dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.32</version></dependency>

问题2.

log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.utils.PlanGenerator).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

解决办法:log4j没有配置日志记录的位置,需要配置log4j.properties,在src目录main目录resources文件夹下下新建log4j.properties

log4j.properties配置文件:

log4j.rootLogger=warn,CONSOLE,File#Console
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss SSS} [%t] [%c] [%p] - %m%n#File  DailyRollingFileAppender
log4j.logger.File=info
log4j.appender.File=org.apache.log4j.DailyRollingFileAppender
log4j.appender.File.layout=org.apache.log4j.PatternLayout
log4j.appender.File.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss SSS} [%t] [%c] [%p] - %m%n
log4j.appender.File.datePattern='.'yyyy-MM-dd
log4j.appender.File.Threshold = info
log4j.appender.File.append=true
log4j.appender.File.File=d://code/logs/flink/disk.log

 此时再次执行成功


http://www.ppmy.cn/news/1365908.html

相关文章

操作系统原理与实验——实验二先来先服务调度算法

实验指南 运行环境&#xff1a; Dev c 算法思想 本实验是模拟进程调度中的先来先服务算法&#xff0c;每次CPU都是按照进入就绪队列的先后次序依次选中一个进程装入CPU运行&#xff0c;等结束时再选取下一个。 核心数据结果 struct Time { int hour, min; }; struct node { int…

go语言通过切片实现先进后出逻辑

目录 一、go语言的通道: 二、go语言实现先进后出: 一、go语言的通道: Go的通道(Channel)是先进先出(FIFO)的数据结构,它保持了发送数据和接收数据的顺序。当你向通道发送数据时,数据会被放入通道的尾部;而从通道接收数据时,会从通道的头部取出数据。这确保了数据的…

大数据毕业设计—基于Python旅游数据采集可视化分析推荐系统(完整系统源码+数据库+详细文档+全源码解析)

文章目录 基于Python旅游数据采集可视化分析推荐系统&#xff08;完整系统源码数据库详细文档全源码解析&#xff09;源码获取方式在文章末尾源码获取方式在文章末尾一、项目概述二、项目说明三、开发环境四、功能实现五、系统页面实现用户登录注册系统首页数据操作管理价格与销…

vue a-table 实现指定字段相同数据合并行

vue a-table 实现相同数据合并行 实现效果代码实现cloums数据格式数据源格式合并代码 实现效果 代码实现 cloums数据格式 const getColumns function () {return [{title: "分类",dataIndex: "checked",width: "150px",customRender: (text, …

10 Redis之SB整合Redis

7. SB整合Redis Spring Boot 中可以直接使用 Jedis 实现对 Redis 的操作&#xff0c;但一般不这样用&#xff0c;而是使用 Redis操作模板 RedisTemplate 类的实例来操作 Redis。 RedisTemplate 类是一个对 Redis 进行操作的模板类。该模板类中具有很多方法&#xff0c;这些方…

SpringBoot整合rabbitmq-直连交换机队列(二)

说明&#xff1a;本文章主要是Direct定向/直连类型交换机的使用&#xff0c;它的大致流程是将一个队列绑定到一个直连交换机上&#xff0c;并赋予一个路由键 routingkey&#xff0c;当一个消息携带着路由值为routingkey&#xff0c;这个消息通过生产者发送给交换机时&#xff0…

用来检查 CUDA、Conda 和 PyTorch 的版本的python文件

提供的 Python 代码片段包括几个语句&#xff0c;用来检查 CUDA、Conda 和 PyTorch 的版本&#xff0c;以及一些与 CUDA 相关的系统配置。让我们分解一下&#xff1a;PyTorch 版本和配置&#xff1a;torch.__config__.show()&#xff1a;显示 PyTorch 的构建配置。 torch.__ver…

LeetCode69. x 的平方根(C++)

LeetCode69. x 的平方根 题目链接代码 题目链接 https://leetcode.cn/problems/sqrtx/description/ 代码 class Solution { public:int mySqrt(int x) {int right x, left 0, ans -1;while(left < right){long long mid left (right - left) / 2;if(mid * mid <…