MapReduce框架原理:7.Join多种应用

news/2024/10/23 9:25:41/

Reduce Join工作原理


Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。


Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。

 Reduce Join案例实操

1)需求

order.txt 

订单数据表t_order

id

pid

amount

1001

01

1

1002

02

2

1003

03

3

1004

01

4

1005

02

5

1006

03

6

pd.txt

商品信息表

pid

pname

01

小米

02

华为

03

格力

将商品信息表中数据根据商品pid合并到订单数据表中

id

pname

amount

1001

小米

1

1004

小米

4

1002

华为

2

1005

华为

5

1003

格力

3

1006

格力

6

2)需求分析

通过将关联条件作为Map输出的key,将两表满足Join条件的数据并携带数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联。

Reduce端表合并(数据倾斜)

1.输入数据:order.txt,pd.txt

2.预期输入数据:根据商品pid合并到订单数据表中

3.MapTask

        3.1Map中处理的事情

  •         (1)获取输入文件类型
    •         (2)获取输入数据
      •         (3)不同文件分别处理
        •         (4)封装Bean对象输出
          •        
            •         3.2默认对产品id排序
            • 4.ReduceTask
  1. 3)代码实现

  2. (1)创建商品和订单合并后的Orderpd类

package com.atguigu.mr.reducejoin;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class Orderpd implements Writable {// order表的数据private String orderId;private String pid;private Integer amount;// pd表的数据private String pname;// 区分数据来源private String title;public String getOrderId() {return orderId;}public void setOrderId(String orderId) {this.orderId = orderId;}public String getPid() {return pid;}public void setPid(String pid) {this.pid = pid;}public Integer getAmount() {return amount;}public void setAmount(Integer amount) {this.amount = amount;}public String getPname() {return pname;}public void setPname(String pname) {this.pname = pname;}public String getTitle() {return title;}public void setTitle(String title) {this.title = title;}@Overridepublic String toString() {return orderId + "\t" + pname + "\t" + amount;}/*** 序列化* @param out* @throws IOException*/public void write(DataOutput out) throws IOException {out.writeUTF(orderId);out.writeUTF(pid);out.writeInt(amount);out.writeUTF(pname);out.writeUTF(title);}/*** 反序列化* @param in* @throws IOException*/public void readFields(DataInput in) throws IOException {orderId = in.readUTF();pid = in.readUTF();amount = in.readInt();pname = in.readUTF();title = in.readUTF();}
}

(2)编写ReduceJoinMapper类

package com.atguigu.mr.reducejoin;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, Orderpd> {private Text outk = new Text();private Orderpd outv = new Orderpd();private FileSplit inputSplit;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {inputSplit = (FileSplit) context.getInputSplit();}/*** 业务处理方法--> 将两个需要做关联的文件数据进行搜集* @param key* @param value* @param context* @throws IOException* @throws InterruptedException*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 获取当前行数据String line = value.toString();// 切割数据String[] datas = line.split("\t");// 将当前数据封装到 Orderpd中if (inputSplit.getPath().getName().contains("order")) {// 当前数据来源于order.txt 文件  1001	01	1// 封装输出数据的keyoutk.set(datas[1]);// 封装输出数据的valueoutv.setOrderId(datas[0]);outv.setPid(datas[1]);outv.setAmount(Integer.parseInt(datas[2]));outv.setPname("");outv.setTitle("order");}else {// 当前数据来源于 pd.txt   01	小米// 封装输出数据的keyoutk.set(datas[0]);// 封装输出数据的valueoutv.setOrderId("");outv.setPid(datas[0]);outv.setAmount(0);outv.setPname(datas[1]);outv.setTitle("pd");}// 将数据写出context.write(outk, outv);}
}

(3)编写ReduceJoinReducer类

package com.atguigu.mr.reducejoin;import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;public class ReduceJoinReducer extends Reducer<Text, Orderpd,Orderpd, NullWritable> {private ArrayList<Orderpd> orderList = new ArrayList<Orderpd>();private Orderpd pd = new Orderpd();/*** 核心处理方法--> 接收Map阶段整合好的数据 进行最终的join操作* @param key* @param values* @param context* @throws IOException* @throws InterruptedException*/@Overrideprotected void reduce(Text key, Iterable<Orderpd> values, Context context) throws IOException, InterruptedException {// 遍历当前相同key的一组valuesfor (Orderpd orderpd : values) {// 判断当前的数据来源if(orderpd.getTitle().equals("order")){try {// 当前数据来源order.txt 文件,将当前数据管理到一个集合当中Orderpd thisOrderpd = new Orderpd();// 将当前传入Orderpd 复制到 thisOrderpdBeanUtils.copyProperties(thisOrderpd, orderpd);orderList.add(thisOrderpd);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}}else {// 当前数据来源pd.txt 文件// 将当前传入Orderpd 复制到 thisOrderpdtry {BeanUtils.copyProperties(pd, orderpd);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}}}// 进行Join操作for (Orderpd op : orderList) {op.setPname(pd.getPname());// 将数据写出context.write(op, NullWritable.get());}// 清空orderListorderList.clear();}
}

 (4)编写ReduceJoinDriver类

package com.atguigu.mr.reducejoin;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class ReduceJoinDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(ReduceJoinDriver.class);job.setMapperClass(ReduceJoinMapper.class);job.setReducerClass(ReduceJoinReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Orderpd.class);job.setOutputKeyClass(Orderpd.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path("F:\\in\\reducejoin"));FileOutputFormat.setOutputPath(job, new Path("F:\\out\\reducejoin_out"));job.waitForCompletion(true);}
}

Map Join

1)使用场景

Map Join适用于一张表十分小、一张表很大的场景。

2)优点

思考:在Reduce端处理过多的表,非常容易产生数据倾斜怎么办?

Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。

3)具体办法:采用DistributedCache

        1)在Mappersetup阶段,将文件读取到缓存集合中。

        2)在Driver驱动类中加载缓存。

//缓存普通文件到Task运行节点。
job.addCacheFile(new URI("file:///e:/cache/pd.txt"));
//如果是集群运行,需要设置HDFS路径
job.addCacheFile(new URI("hdfs://hadoop102:9820/cache/pd.txt"));

Map Join案例实操

1)需求:同上

2)需求分析:MapJoin适用于关联表中有小表的情形

Map端表合并案例分析(Distributedcache)

1 ) DistributedCacheDriver缓存文件

//1加载缓存数据
job.addCacheFile(new URI("file:/l/e:/cache/pd.txt"));

//2Map端join的逻辑不需要
Reduce阶段,设置ReduceTask数量为0

job.setNumReduceTasks(0);

2)读取缓存的文件数据

setup()方法中map方法中
1 获取缓存的文件1 获取一行
2 循环读取缓存文件一行2 截取
3 切割3 获取订单id
4 缓存数据到集合4 获取商品名称
5 关流5 拼接
6 写出


 


http://www.ppmy.cn/news/70422.html

相关文章

红魔品牌五周年,长出一个茂盛“电竞生态”

红魔新品来袭。 5月10日&#xff0c;红魔电竞举办宇宙新品发布会&#xff0c;向广大玩家带来了红魔8 Pro变形金刚领袖版以及氘锋系列IOT&#xff0c;银翼电竞显示器、电竞键鼠等各类融合先锋设计元素的硬核电竞装备。 一、红魔多款硬核电竞新品来袭 红魔8 Pro变形金刚领袖版…

前端综合项目-个人博客网页设计

个人博客前端部分设计 文章目录 前端综合项目-个人博客网页设计1. 预计效果2. 公共样式设计2.1 背景设计2.2 导航栏设计2.3 博客列表页和博客详情页的共同内容2.3.1 页面划分css设计2.3.2 左侧card内容2.3.3 右侧article内容 3. 博客列表页4. 博客详情页5. 博客登录页5.1 页面划…

TCP 和 UDP 协议详解

文章目录 1 概述2 TCP 协议2.1 报文格式2.2 三次握手&#xff0c;建立连接2.3 四次挥手&#xff0c;断开连接2.4 窗口机制 3 UDP 协议3.1 传输头格式 4 扩展4.1 常用端口号4.2 TCP 与 UDP 区别 1 概述 #mermaid-svg-aC8G8xwQRSdze7eM {font-family:"trebuchet ms",ve…

【利用AI刷面试题】AI:十道JavaScript面试题巩固一下知识

文章目录 1. 请说明 JS 中的闭包是什么&#xff0c;它有哪些应用场景&#xff1f;2. 请描述一下数组的遍历方式&#xff0c;如何向数组中添加元素&#xff1f;3. 如何利用JS实现一个进度条&#xff1f;4. 请阐述浮点数在 JavaScript 中的存储机制&#xff1f;5. 请简述ES6 模块…

基于html+css的图展示70

准备项目 项目开发工具 Visual Studio Code 1.44.2 版本: 1.44.2 提交: ff915844119ce9485abfe8aa9076ec76b5300ddd 日期: 2020-04-16T16:36:23.138Z Electron: 7.1.11 Chrome: 78.0.3904.130 Node.js: 12.8.1 V8: 7.8.279.23-electron.0 OS: Windows_NT x64 10.0.19044 项目…

Shell基础学习---4、文本处理工具、综合应用案例(归档文件、发送信息)

1、文本处理工具 1.1 cut cut的工作就是“剪”&#xff0c;具体的说就是在文件中负责剪切数据用的。cut命令从文件的每一行剪切字节、字符和字段并将这些字节、字符和字段输出。 1、基本语法 cut [选项参数] filename 说明&#xff1a;默认分割符是制表符 2、选项参数说明 选…

JMX vs JFR:谁才是最强大的JVM监控利器?

大家好&#xff0c;我是小米&#xff01;今天我们来聊一聊JVM监控系统&#xff0c;特别是关于JMX和JFR的使用。你是否有过在线上应用出现性能问题时&#xff0c;无法准确获取关键指标的困扰呢&#xff1f;那么&#xff0c;不妨听听我给大家带来的解决方案。 什么是JMX 首先&a…

C# TimeSpan的使用

TimeSpan&#xff1a;表示一个时间间隔。使用实例如下&#xff1a; TimeSpan m_timeSpan new TimeSpan(10,10,10); 1、TimeSpan初始化 TimeSpan(Int32, Int32, Int32) 将 TimeSpan 结构的新实例初始化为指定的小时数、分钟数和秒数。 TimeSpan(Int32, Int32, Int32, I…