MapReduce简单应用(一)——WordCount

ops/2025/2/1 11:01:15/

目录

  • 1. 执行过程
    • 1.1 分割
    • 1.2 Map
    • 1.3 Combine
    • 1.4 Reduce
  • 2. 代码和结果
    • 2.1 pom.xml中依赖配置
    • 2.2 工具类util
    • 2.3 WordCount
    • 2.4 结果
  • 参考

1. 执行过程

  假设WordCount的两个输入文本text1.txt和text2.txt如下。

Hello World
Bye World
Hello Hadoop
Bye Hadoop

1.1 分割

  将每个文件拆分成split分片,由于测试文件比较小,所以每个文件为一个split,并将文件按行分割形成<key,value>对,如下图所示。这一步由MapReduce自动完成,其中key值为偏移量,由MapReduce自动计算出来,包括回车所占的字符数。
在这里插入图片描述

1.2 Map

  将分割好的<key,value>对交给用户定义的Map方法处理,生成新的<key,value>对。处理流程为先对每一行文字按空格拆分为多个单词,每个单词出现次数设初值为1,key为某个单词,value为1,如下图所示。
在这里插入图片描述

1.3 Combine

  得到Map方法输出的<key,value>对后,Mapper将它们按照key值进行升序排列,并执行Combine合并过程,将key值相同的value值累加,得到Mapper的最终输出结果,并写入磁盘,如下图所示。
在这里插入图片描述

1.4 Reduce

  Reducer先对从Mapper接受的数据进行排序,并将key值相同的value值合并到一个list列表中,再交由用户自定义的Reduce方法进行汇总处理,得到新的<key,value>对,并作为WordCount的输出结果,存入HDFS,如下图所示。
在这里插入图片描述

2. 代码和结果

2.1 pom.xml中依赖配置

	<dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.3.6</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>3.3.6</version><type>pom</type></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-jobclient</artifactId><version>3.3.6</version></dependency></dependencies>

2.2 工具类util

  util.removeALL的功能是删除hdfs上的指定输出路径(如果存在的话),而util.showResult的功能是打印wordcount的结果。

import java.net.URI;
import java.util.regex.Matcher;
import java.util.regex.Pattern;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;public class util {public static FileSystem getFileSystem(String uri, Configuration conf) throws Exception {URI add = new URI(uri);return FileSystem.get(add, conf);}public static void removeALL(String uri, Configuration conf, String path) throws Exception {FileSystem fs = getFileSystem(uri, conf);if (fs.exists(new Path(path))) {boolean isDeleted = fs.delete(new Path(path), true);System.out.println("Delete Output Folder? " + isDeleted);}}public static void  showResult(String uri, Configuration conf, String path) throws Exception {FileSystem fs = getFileSystem(uri, conf);String regex = "part-r-";Pattern pattern = Pattern.compile(regex);if (fs.exists(new Path(path))) {FileStatus[] files = fs.listStatus(new Path(path));for (FileStatus file : files) {Matcher matcher = pattern.matcher(file.getPath().toString());if (matcher.find()) {FSDataInputStream openStream = fs.open(file.getPath());IOUtils.copyBytes(openStream, System.out, 1024);openStream.close();}}}}
}

2.3 WordCount

  正常来说,MapReduce编程都是要把代码打包成jar文件,然后用hadoop jar jar文件名 主类名称 输入路径 输出路径。下面代码中直接给出了输入和输出路径,可以直接运行。

import java.io.IOException;
import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class App {public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {System.out.println(key + " " + value);Text keyOut;IntWritable valueOut = new IntWritable(1);StringTokenizer token = new StringTokenizer(value.toString());while (token.hasMoreTokens()) {keyOut = new Text(token.nextToken());context.write(keyOut, valueOut);}}}public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable value : values) {sum += value.get();}context.write(key, new IntWritable(sum));} }public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] myArgs = {"file:///home/developer/CodeArtsProjects/WordCount/text1.txt", "file:///home/developer/CodeArtsProjects/WordCount/text2.txt", "hdfs://localhost:9000/user/developer/wordcount/output"};util.removeALL("hdfs://localhost:9000", conf, myArgs[myArgs.length - 1]);Job job = Job.getInstance(conf, "wordcount");job.setJarByClass(App.class);job.setMapperClass(MyMapper.class);job.setReducerClass(MyReducer.class);job.setCombinerClass(MyReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);for (int i = 0; i < myArgs.length - 1; i++) {FileInputFormat.addInputPath(job, new Path(myArgs[i]));}FileOutputFormat.setOutputPath(job, new Path(myArgs[myArgs.length - 1]));int res = job.waitForCompletion(true) ? 0 : 1;if (res == 0) {System.out.println("WordCount结果:");util.showResult("hdfs://localhost:9000", conf, myArgs[myArgs.length - 1]);}System.exit(res);}
}

2.4 结果

在这里插入图片描述

参考

吴章勇 杨强著 大数据Hadoop3.X分布式处理实战


http://www.ppmy.cn/ops/154721.html

相关文章

OceanBase 读写分离探讨

版本信息 OceanBase: 4.2.1.10 OBProxy: 4.3.3.0 租户类型&#xff1a; MySQL租户 弱一致性读 官方声明 默认情况下,所有的请求都是发送到数据的 Leader 副本上&#xff0c;即强一致性的请求&#xff0c;因为 OLAP 的分析计算&#xff0c;一般对于数据的一致性要求不高&…

创建前端项目的方法

目录 一、创建前端项目的方法 1.前提&#xff1a;安装Vue CLI 2.方式一&#xff1a;vue create项目名称 3.方式二&#xff1a;vue ui 二、Vue项目结构 三、修改Vue项目端口号的方法 一、创建前端项目的方法 1.前提&#xff1a;安装Vue CLI npm i vue/cli -g 2.方式一&…

二叉树的最大深度(遍历思想+分解思想)

Problem: 104. 二叉树的最大深度 文章目录 题目描述思路复杂度Code 题目描述 思路 遍历思想(实则二叉树的先序遍历) 1.欲望求出最大的深度&#xff0c;先可以记录一个变量res&#xff0c;同时记录每次当前节点所在的层数depth 2.在递的过程中&#xff0c;每次递一层&#xff0…

蓝桥云课下载(jdk11、eclipse、idea)

目录 下载jdk11下载eclipse下载idea 下载jdk11 下载地址&#xff1a; &#xff08;自用&#xff09; https://www.lanqiao.cn/courses/44495/learning/?id3144371&compatibilityfalse 安装步骤&#xff1a; 双击 -> -> 下一步 -> 配置环境变量&#xff08;略…

【深度之眼cs231n第七期】笔记(三十一)

目录 强化学习什么是强化学习&#xff1f;马尔可夫决策过程&#xff08;MDP&#xff09;Q-learning策略梯度SOTA深度强化学习 还剩一点小尾巴&#xff0c;还是把它写完吧。&#xff08;距离我写下前面那行字又过了好几个月了【咸鱼本鱼】&#xff09;&#xff08;汗颜&#xff…

在sortablejs的拖拽排序情况下阻止input拖拽事件

如题 问题 在vue3的elementPlus的table中&#xff0c;通过sortablejs添加了行拖拽功能&#xff0c;但是在行内会有输入框&#xff0c;此时拖拽输入框会触发sortablejs的拖拽功能 解决 基于这个现象&#xff0c;我怀疑是由于拖拽事件未绑定而冒泡到后面的行上从而导致的拖拽…

1 HDFS

1 HDFS 1. HDFS概述2. HDFS架构3. HDFS的特性4. HDFS 的命令行使用5. hdfs的高级使用命令6. HDFS 的 block 块和副本机制6.1 抽象为block块的好处6.2 块缓存6.3 hdfs的文件权限验证6.4 hdfs的副本因子 7. HDFS 文件写入过程&#xff08;非常重要&#xff09;7.1 网络拓扑概念7.…

IDEA创建修改gitee仓库

一、创建gitee仓库 创建gitee仓库 点击复制仓库地址 右击项目名 --> Open In --> Terminal 初始化仓库 git init 添加仓库地址 复制之前创建的仓库地址 git remote add origin 仓库地址 二、修改IDEA的gitee仓库 查询当前项目所在仓库 git remote -v 删除原仓库…