Reduce Join工作原理
Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。
Reduce Join案例实操
1)需求
order.txt
订单数据表t_order
id | pid | amount |
1001 | 01 | 1 |
1002 | 02 | 2 |
1003 | 03 | 3 |
1004 | 01 | 4 |
1005 | 02 | 5 |
1006 | 03 | 6 |
pd.txt
商品信息表
pid | pname |
---|---|
01 | 小米 |
02 | 华为 |
03 | 格力 |
将商品信息表中数据根据商品pid合并到订单数据表中
id | pname | amount |
---|---|---|
1001 | 小米 | 1 |
1004 | 小米 | 4 |
1002 | 华为 | 2 |
1005 | 华为 | 5 |
1003 | 格力 | 3 |
1006 | 格力 | 6 |
2)需求分析
通过将关联条件作为Map输出的key,将两表满足Join条件的数据并携带数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联。
Reduce端表合并(数据倾斜)
1.输入数据:order.txt,pd.txt
2.预期输入数据:根据商品pid合并到订单数据表中
3.MapTask
3.1Map中处理的事情
- (1)获取输入文件类型
- (2)获取输入数据
- (3)不同文件分别处理
- (4)封装Bean对象输出
-
- 3.2默认对产品id排序
- 4.ReduceTask
-
- (4)封装Bean对象输出
- (3)不同文件分别处理
- (2)获取输入数据
-
3)代码实现
-
(1)创建商品和订单合并后的Orderpd类
package com.atguigu.mr.reducejoin;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class Orderpd implements Writable {// order表的数据private String orderId;private String pid;private Integer amount;// pd表的数据private String pname;// 区分数据来源private String title;public String getOrderId() {return orderId;}public void setOrderId(String orderId) {this.orderId = orderId;}public String getPid() {return pid;}public void setPid(String pid) {this.pid = pid;}public Integer getAmount() {return amount;}public void setAmount(Integer amount) {this.amount = amount;}public String getPname() {return pname;}public void setPname(String pname) {this.pname = pname;}public String getTitle() {return title;}public void setTitle(String title) {this.title = title;}@Overridepublic String toString() {return orderId + "\t" + pname + "\t" + amount;}/*** 序列化* @param out* @throws IOException*/public void write(DataOutput out) throws IOException {out.writeUTF(orderId);out.writeUTF(pid);out.writeInt(amount);out.writeUTF(pname);out.writeUTF(title);}/*** 反序列化* @param in* @throws IOException*/public void readFields(DataInput in) throws IOException {orderId = in.readUTF();pid = in.readUTF();amount = in.readInt();pname = in.readUTF();title = in.readUTF();}
}
(2)编写ReduceJoinMapper类
package com.atguigu.mr.reducejoin;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, Orderpd> {private Text outk = new Text();private Orderpd outv = new Orderpd();private FileSplit inputSplit;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {inputSplit = (FileSplit) context.getInputSplit();}/*** 业务处理方法--> 将两个需要做关联的文件数据进行搜集* @param key* @param value* @param context* @throws IOException* @throws InterruptedException*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 获取当前行数据String line = value.toString();// 切割数据String[] datas = line.split("\t");// 将当前数据封装到 Orderpd中if (inputSplit.getPath().getName().contains("order")) {// 当前数据来源于order.txt 文件 1001 01 1// 封装输出数据的keyoutk.set(datas[1]);// 封装输出数据的valueoutv.setOrderId(datas[0]);outv.setPid(datas[1]);outv.setAmount(Integer.parseInt(datas[2]));outv.setPname("");outv.setTitle("order");}else {// 当前数据来源于 pd.txt 01 小米// 封装输出数据的keyoutk.set(datas[0]);// 封装输出数据的valueoutv.setOrderId("");outv.setPid(datas[0]);outv.setAmount(0);outv.setPname(datas[1]);outv.setTitle("pd");}// 将数据写出context.write(outk, outv);}
}
(3)编写ReduceJoinReducer类
package com.atguigu.mr.reducejoin;import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;public class ReduceJoinReducer extends Reducer<Text, Orderpd,Orderpd, NullWritable> {private ArrayList<Orderpd> orderList = new ArrayList<Orderpd>();private Orderpd pd = new Orderpd();/*** 核心处理方法--> 接收Map阶段整合好的数据 进行最终的join操作* @param key* @param values* @param context* @throws IOException* @throws InterruptedException*/@Overrideprotected void reduce(Text key, Iterable<Orderpd> values, Context context) throws IOException, InterruptedException {// 遍历当前相同key的一组valuesfor (Orderpd orderpd : values) {// 判断当前的数据来源if(orderpd.getTitle().equals("order")){try {// 当前数据来源order.txt 文件,将当前数据管理到一个集合当中Orderpd thisOrderpd = new Orderpd();// 将当前传入Orderpd 复制到 thisOrderpdBeanUtils.copyProperties(thisOrderpd, orderpd);orderList.add(thisOrderpd);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}}else {// 当前数据来源pd.txt 文件// 将当前传入Orderpd 复制到 thisOrderpdtry {BeanUtils.copyProperties(pd, orderpd);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}}}// 进行Join操作for (Orderpd op : orderList) {op.setPname(pd.getPname());// 将数据写出context.write(op, NullWritable.get());}// 清空orderListorderList.clear();}
}
(4)编写ReduceJoinDriver类
package com.atguigu.mr.reducejoin;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class ReduceJoinDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(ReduceJoinDriver.class);job.setMapperClass(ReduceJoinMapper.class);job.setReducerClass(ReduceJoinReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Orderpd.class);job.setOutputKeyClass(Orderpd.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path("F:\\in\\reducejoin"));FileOutputFormat.setOutputPath(job, new Path("F:\\out\\reducejoin_out"));job.waitForCompletion(true);}
}
Map Join
1)使用场景
Map Join适用于一张表十分小、一张表很大的场景。
2)优点
思考:在Reduce端处理过多的表,非常容易产生数据倾斜怎么办?
在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。
3)具体办法:采用DistributedCache
(1)在Mapper的setup阶段,将文件读取到缓存集合中。
(2)在Driver驱动类中加载缓存。
//缓存普通文件到Task运行节点。
job.addCacheFile(new URI("file:///e:/cache/pd.txt"));
//如果是集群运行,需要设置HDFS路径
job.addCacheFile(new URI("hdfs://hadoop102:9820/cache/pd.txt"));
Map Join案例实操
1)需求:同上
2)需求分析:MapJoin适用于关联表中有小表的情形
Map端表合并案例分析(Distributedcache)
1 ) DistributedCacheDriver缓存文件
//1加载缓存数据
job.addCacheFile(new URI("file:/l/e:/cache/pd.txt"));
//2Map端join的逻辑不需要
Reduce阶段,设置ReduceTask数量为0
job.setNumReduceTasks(0);
2)读取缓存的文件数据
setup()方法中 | map方法中 |
1 获取缓存的文件 | 1 获取一行 |
2 循环读取缓存文件一行 | 2 截取 |
3 切割 | 3 获取订单id |
4 缓存数据到集合 | 4 获取商品名称 |
5 关流 | 5 拼接 |
6 写出 |