Hadoop3教程(十七):MapReduce之ReduceJoin案例分析

news/2024/11/20 1:32:02/

文章目录

  • (113)ReduceJoin案例需求分析
  • (114)ReduceJoin案例代码实操 - TableBean
  • (115)ReduceJoin案例代码实操 - TableMapper
  • (116)ReduceJoin案例代码实操 - Reducer及Driver
  • 参考文献

(113)ReduceJoin案例需求分析

现在有两个文件:

  • orders.txt,存放的是订单ID、产品ID、产品数量
  • pd.txt,这是一个产品码表,存放的是产品ID、产品中文名;

现在是想通过join,来实现这么一个预期输出,即订单ID、产品中文名、产品数量。

以上是本次案例需求。

简单思考一下思路。我们需要将关联条件作为Map输出的key,将两表满足Join条件的数据以及数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联。

具体该怎么做呢?

Map中在处理的时候,需要获取输入的文件内容和文件名(这个是可以在切片的时候获取的),然后不同文件分别做不同处理,处理完成后封装bean对象输出。

注意,Map在输出的时候,需要以产品ID作为key,只有这样做,才能将相同产品ID的orders.txt记录和pd.txt记录,放在同一个reduceTask里,进而实现最终的替换。value的话,选择订单ID、订单数量、文件名。这里传入文件名的原因是Reduce阶段需要根据不同文件名实现不同处理,所以一定得需要传一个文件名进来。

另外提一句,封装bean对象的时候,需要把两个文件里的所有字段合起来作为一个bean对象,这样子,orders文件的数据可以用这个bean对象,pd.txt里的数据也可以用这个bean对象。相当于做一个大宽表。

reduce阶段就很简单了,相同产品ID的orders.txt记录和pd.txt记录,被放在同一个reduceTask里,可以把来自orders的bean放在一个集合里,来自pd的bean放在一个集合里,然后遍历set覆盖就可以。

(114)ReduceJoin案例代码实操 - TableBean

首先需要定义一个Bean对象,用来序列化两个输入文件的数据,我们命名为TableBean。

package com.atguigu.mapreduce.reducejoin;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class TableBean implements Writable {private String id; //订单idprivate String pid; //产品idprivate int amount; //产品数量private String pname; //产品名称private String flag; //判断是order表还是pd表的标志字段public TableBean() {}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getPid() {return pid;}public void setPid(String pid) {this.pid = pid;}public int getAmount() {return amount;}public void setAmount(int amount) {this.amount = amount;}public String getPname() {return pname;}public void setPname(String pname) {this.pname = pname;}public String getFlag() {return flag;}public void setFlag(String flag) {this.flag = flag;}@Overridepublic String toString() {return id + "\t" + pname + "\t" + amount;}// 序列化方法@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(id);out.writeUTF(pid);out.writeInt(amount);out.writeUTF(pname);out.writeUTF(flag);}// 反序列化方法// 注意,序列化的顺序必须要跟反序列化的顺序一致@Overridepublic void readFields(DataInput in) throws IOException {this.id = in.readUTF();this.pid = in.readUTF();this.amount = in.readInt();this.pname = in.readUTF();this.flag = in.readUTF();}
}

注意,序列化的顺序必须要跟反序列化的顺序一致。

(115)ReduceJoin案例代码实操 - TableMapper

TableMapper的主要作用,就是将输入的数据,划分成指定的KV对,以供Reduce阶段使用。

命名为TableMapper,获取文件名称的代码也包含在这里。

package com.atguigu.mapreduce.reducejoin;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class TableMapper extends Mapper<LongWritable,Text,Text,TableBean> {private String filename;private Text outK = new Text();private TableBean outV = new TableBean();@Overrideprotected void setup(Context context) throws IOException, InterruptedException {//获取对应文件名称InputSplit split = context.getInputSplit();FileSplit fileSplit = (FileSplit) split;filename = fileSplit.getPath().getName();}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//获取一行String line = value.toString();//判断是哪个文件,然后针对文件进行不同的操作if(filename.contains("order")){  //订单表的处理String[] split = line.split("\t");//封装outKoutK.set(split[1]);//封装outVoutV.setId(split[0]);outV.setPid(split[1]);outV.setAmount(Integer.parseInt(split[2]));outV.setPname("");outV.setFlag("order");}else {                             //商品表的处理String[] split = line.split("\t");//封装outKoutK.set(split[0]);//封装outVoutV.setId("");outV.setPid(split[0]);outV.setAmount(0);outV.setPname(split[1]);outV.setFlag("pd");}//写出KVcontext.write(outK,outV);}
}

(116)ReduceJoin案例代码实操 - Reducer及Driver

主要是编写Reduce部分。Mapper之后,一组相同的key的数据会进入一个ReduceTask,接下来需要编写自定义逻辑,让Reduce可以实现关联后输出。

需要创建两个集合,每个集合接收不同文件,一个接收order文件数据,另一个接收码表数据。然后循环遍历order集合,把码表集合里的值set进去。

新建TableReducer:

package com.atguigu.mapreduce.reducejoin;import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;public class TableReducer extends Reducer<Text,TableBean,TableBean, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {ArrayList<TableBean> orderBeans = new ArrayList<>();TableBean pdBean = new TableBean();for (TableBean value : values) {//判断数据来自哪个表if("order".equals(value.getFlag())){   //订单表//创建一个临时TableBean对象接收valueTableBean tmpOrderBean = new TableBean();try {BeanUtils.copyProperties(tmpOrderBean,value);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}//将临时TableBean对象添加到集合orderBeansorderBeans.add(tmpOrderBean);}else {                                    //商品表try {BeanUtils.copyProperties(pdBean,value);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}}}//遍历集合orderBeans,替换掉每个orderBean的pid为pname,然后写出for (TableBean orderBean : orderBeans) {orderBean.setPname(pdBean.getPname());//写出修改后的orderBean对象context.write(orderBean,NullWritable.get());}}
}

根据教程上说的,这里只有一个地方需要注意,但是用处也不是很大,就是 集合在add value的时候,不能直接add 传进来的value,而是需要重新new一个TableBean,将value值赋值给这个新的TableBean,最后add这个新的TableBean。

这么做的原因是,传进来的values,其实是一个Iterable<TableBean> ,不是传统意义上的迭代器,可以简单理解成,Iterable<TableBean> 里的每个value用的是同一个内存地址,每次读取出value就总是赋给那个内存地址,所以不能直接add value,否则add 一百次,也只会记住最后一次add的那个value。

这似乎是Hadoop为了避免因创建过多实例引起资源浪费,而做的优化。

没有测过,做简单了解吧。

最后在驱动类里注册:

package com.atguigu.mapreduce.reducejoin;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class TableDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Job job = Job.getInstance(new Configuration());job.setJarByClass(TableDriver.class);job.setMapperClass(TableMapper.class);job.setReducerClass(TableReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(TableBean.class);job.setOutputKeyClass(TableBean.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path("D:\\input"));FileOutputFormat.setOutputPath(job, new Path("D:\\output"));boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}

大功告成

参考文献

  1. 【尚硅谷大数据Hadoop教程,hadoop3.x搭建到集群调优,百万播放】

http://www.ppmy.cn/news/1158420.html

相关文章

Linux: Error: EACCES: permission denied Linux 解决方法

原因&#xff1a;Linux终端创建文件夹无权限。 解决方法&#xff1a;输入命令 sudo chmod -R 777 /工作目录 例如&#xff1a;sudo chmode -R 777 /home/HDD sudo&#xff1a;是linux系统管理指令&#xff0c;是允许系统管理员让普通用户执行一些或者全部的root命令的一个工具…

AI电销机器人好不好用关键是什么?

影响AI电销机器人是否好用的两个因素分别是&#xff0c;识别系统以及线路。 有很多电销企业都想找一个好用的AI电销机器人&#xff0c;可是什么样的机器人才是好用的机器人呢?有哪些因素会影响AI电销机器人好不好用呢? 添加图片注释&#xff0c;不超过 140 字&#xff08;可选…

2652. 倍数求和

2652. 倍数求和 题目方法-【枚举】 & 题目特征-【求计算在给定范围内满足某种条件的整数之和】方法-【容斥原理】 & 题目特征-【计算满足多个条件的元素之和&#xff0c;并且需要避免重复计数】 题目 题目链接&#xff1a;https://leetcode.cn/problems/sum-multiples…

华为云应用中间件DCS系列—Redis实现(视频直播)消息弹幕

云服务、API、SDK&#xff0c;调试&#xff0c;查看&#xff0c;我都行 阅读短文您可以学习到&#xff1a;应用中间件系列之Redis实现&#xff08;视频直播&#xff09;消息弹幕 1 什么是DEVKIT 华为云开发者插件&#xff08;Huawei Cloud Toolkit&#xff09;&#xff…

Linux中使用nfs共享存储

NFS是一种基于TCP/IP传输的网络文件系统协议。通过使用NFS协议&#xff0c;客户机可以像访问本地目录一样访问远程服务器中的共享资源。对于大多数负载均衡群来说&#xff0c;使用NFS协议来共享数据存储是比较常见的做法&#xff0c;NFS也是存储设备必然支持的一种协议。但是由…

ubuntu 修改磁盘名称

前言&#xff1a; 最近重装了ubuntu系统&#xff0c;硬盘名称是一个很长的数字-字母序列&#xff0c;不太好看&#xff0c;根据磁盘的内存&#xff0c;把磁盘重命名了一下。 步骤1&#xff1a;查看当前所有磁盘及分区 sudo fdisk -l 根据存储容量的大小&#xff0c;找到对应…

G语言数组和切片

数组 数组是属于同一类型的元素的集合。例如&#xff0c;整数 5、8、9、79、76 的集合形成一个数组。Go 中不允许混合不同类型的值&#xff0c;例如同时包含字符串和整数的数组。 声明 数组属于类型[n]T。n表示数组中元素的数量&#xff0c;T表示每个元素的类型。元素的数量…

如何通过内网穿透实现远程连接NAS群晖drive并挂载电脑硬盘?

文章目录 前言1.群晖Synology Drive套件的安装1.1 安装Synology Drive套件1.2 设置Synology Drive套件1.3 局域网内电脑测试和使用 2.使用cpolar远程访问内网Synology Drive2.1 Cpolar云端设置2.2 Cpolar本地设置2.3 测试和使用 3. 结语 前言 群晖作为专业的数据存储中心&…