mapreduce__0">一. mapreduce 概述
mapreduce是一个并行计算框架,它起源于Google的MapReduce论文,它主要用于离线海量数据计算。
- 优点:海量数据离线处理,开发简单,部署方便
- 缺点:仅适用于批处理,不支持实时数据计算
wordcount_5">二. wordcount案例
1. 需求
统计一个文件中每个单词出现的次数(文件中每行的多个单词用空格分开),下面是用mapreduce实现wordcount的数据流程:
2. 代码实现
package mr;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class WordCountMR {// 实现map方法static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {@Overridepublic void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] words = value.toString().split(" ");for (String word: words) {context.write(new Text(word), new IntWritable(1));}}}// 实现reduce方法static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {@Overridepublic void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}context.write(key, new IntWritable(sum));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(WordCountMR.class);job.setJobName("WordCount");// 设置输入,输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 设置Mapperjob.setMapperClass(WordCountMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);// 设置Reducerjob.setReducerClass(WordCountReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 设置reduce task 数量job.setNumReduceTasks(1);boolean waitFor = job.waitForCompletion(true);System.exit(waitFor ? 0 : 1);}
}
maven 依赖
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.2.4</version>
</dependency>
3. 运行
mapreduce程序运行方式可以分为本地模式和集群模式
本地运行模式:方便程序开发与调试
输入文件:
结果文件:
集群运行模式:可以利用集群的计算资源,一般为生产部署方式
将代码打包,并上传到集群上去。
# 查看输入文件
[root@hadoop1 ~]# hdfs dfs -text /test/a.txt
hello world
name hello
world# 提交任务
[root@hadoop1 ~]# hadoop jar learn-1.0-SNAPSHOT.jar mr.WordCountMR /test/a.txt /output# 查看结果文件
[root@hadoop1 ~]# hdfs dfs -text /output/part-r-00000
hello 2
name 1
world 2