大数据Hadoop中MapReduce的介绍包括编程模型、工作原理(MapReduce、MapTask、ReduceTask、Shuffle工作原理)

ops/2025/1/23 2:33:10/

MapReduce概述

MapReduce是Hadoop的核心项目之一,它是一个分布式计算框架, 可用于大数据并行处理的计算模型、框架和平台,主要解决海量数据的计算,是大数据中较为熟知的分布式计算框架。

MapReduce作为分布式计算框架,其底层思想采用的是“分而治之”,所谓的“分而治之”就是把一个复杂的问题,按照一定的规则分为若干个没有依赖关系的简单问题,然后逐个解决这些简单问题,把若干个简单问题的结果组成整个复杂问题的最终结果。

为了更好地理解“分而治之”的思想,先通过一个生活中的例子进行介绍。例如,某停车场管理人员要统计一个大型停车场的停车数量,在车辆停车后不再挪动的情况下,将大型停车场划分为不同的停车区域,然后针对划分的每个区域单独进行统计,最后将每个区域的停车数量累加在一起。

在这里插入图片描述

MapReduce计算海量数据时,每个MapReduce程序被初始化为一个工作任务,这个工作任务在运行时会经历Map过程和Reduce过程。

Map过程:负责将工作任务分解为若干个相互独立的子任务,这些子任务相互独立,可以单独被执行。

Reduce过程:负责将Map过程处理完的子任务结果合并,从而得到工作任务的最终结果。

MapReduce的执行过程
在这里插入图片描述

MapReduce编程模型

MapReduce是一种编程模型,用于处理大规模数据集的并行计算。使用MapReduce执行处理大规模数据集计算任务的时候,计算任务主要经历两个过程,分别是Map过程和Reduce过程,其中Map过程用于对原始数据进行处理;Reduce过程用于对Map过程处理后的数据进行汇总,得到最终结果。

在这里插入图片描述

MapReduce的编程模型借鉴了计算机程序设计语言LISt Processing(LISP)的设计思想,提供了map()和reduce()这两个方法分别用于Map过程和Reduce过程:

  • map()方法接收格式为键值对(<Key,Value>)的数据,其中键(Key)是指每行数据的起始偏移量,也就是每行数据开头的字符所在的位置,值(Value)是指文本文件中的每行数据。使用map()方法处理后的数据,会被映射为新的键值对作为reduce()方法的输入;
  • reduce()方法默认会将每个键值对中键相同的值进行合并,当然也可以根据实际需求调整合并规则。

MapReduce简易模型的数据处理过程:

在这里插入图片描述

(1)MapReduce通过特定的规则将原始数据解析成键值对<Key1,Value1>的形式。

(2)解析后的键值对<Key1,Value1>会作为map()方法的输入,map()方法根据映射规则将<Key1,Value1>映射为新的键值对<Key2,Value2>。

(3)新的键值对<Key2,Value2>作为reduce()方法的输入,reduce()方法将具有相同键的值合并在一起,生成最终的键值对<Key3,Value3>。

在MapReduce中,对于一些数据的计算可能不需要Reduce过程,也就是说MapReduce的简易模型的数据处理过程可能只有Map过程,由Map过程处理后的数据直接输出到目标文件。但是,对于大多数数据的计算来说,都是需要Reduce过程的,并且由于数据计算繁琐,需要设定多个Reduce过程。
在这里插入图片描述
上图展示的是含有3个Map过程和2个Reduce过程的MapReduce模型,其中,由3个Map过程处理后的键值对会根据分区规则输出到不同的Reduce过程进行处理,默认情况下,分区规则是根据Map过程输出的键值对中的键的哈希值决定的。Reduce过程是最后的处理过程,其输出结果不会进行第二次合并,也就是说,不同的Reduce过程都会将处理结果输出到单独的目标文件。

为了更好地理解MapReduce编程模型,接下来,通过一个经典案例——词频统计来帮助加深对MapReduce的理解。

假设有两个文本文件test1.txt和文件test2.txt。

文件test1.txt的内容

Hello World

Hello Hadoop

Hello CHQ

文件test2.txt的内容

Hadoop MapReduce

MapReduce Spark

使用MapReduce程序统计文件test1.txt和test2.txt中每个单词出现的次数,实现词频统计的流程。

在这里插入图片描述

MapReduce工作原理

1.MapReduce工作过程

MapReduce编程模型开发简单且功能强大,专门为并行处理大规模数据量而设计。

在这里插入图片描述
上图可以看出,在MapReduce中的MapTask是实现Map过程的多个任务,ReduceTask是实现Reduce过程的任务。MapReduce的工作过程大致可以分为以下4步。

1.分片(Split)和解析原始数据

输入MapTask的原始数据必须经过分片和解析操作。分片和解析操作说明如下:

  • **分片操作:**指将原始数据文件file划分为多个数据块,每个数据块默认是128MB,即block1、block2和block3,MapReduce会为每个数据块创建一个MapTask,并且该MapTask运行的map()方法,处理数据块内的每行数据。
  • **解析操作:**将数据块内的每行数据映射为键值对的形式,其中,键表示数据块中每行数据的起始偏移量,值表示数据块中的每行数据。

2.执行MapTask

每个MapTask都有一个内存缓冲区,默认缓冲区的100MB,输入的键值对经过MapTask处理后,会将中间结果暂时写入内存缓冲区。如果写入的数据达到内存缓冲区的阈值,默认为80MB,则会启动一个线程,将内存缓冲区的数据写入磁盘,形成临时文件,数据在写入磁盘之前会进行分区和排序操作。如果MapTask产生的中间结果比较大,则会形成多个临时文件。当MapTask结束后,内存缓冲区中的数据也会写入磁盘而形成临时文件,此时多个临时文件会合并为一个文件,该文件中存放了MapTask的处理结果。

3.执行ReduceTask

每个MapTask的处理结果会根据分区规则被分配到对应的ReduceTask进行处理,ReduceTask,通过运行的reduce()方法进行逻辑处理,得到最终的键值对并输出。

4.输出数据

MapReduce会自动把每个ReduceTask的处理结果以键值对的形式写入指定目录(dictionary)的文件,即part-0,part-1和part-2。

2.MapTask工作原理

MapTask实现过程大致分为Read阶段、Map阶段、Collect阶段、Spill阶段和Combine阶段。

在这里插入图片描述

1.Read阶段:通过MapReduce内置的InputFormat组件将读取的文件进行分片处理,将每个数据块中的数据映射为键值对形式。

2.Map阶段:根据实际应用场景自定义map()方法,将Read阶段映射的键值对进行转换,并生成一系列新的键值对。

3.Collect阶段:将Map阶段输出的键值对写入内存缓冲区(Memory Buffer)。

4.Spill阶段:判断内存缓冲区中的数据是否达到指定阈值。当数据达到指定阈值时,会将内存缓冲区中的数据写入本地磁盘(Disk),形成临时文件。内存缓冲区中的数据在写入磁盘之前会进行分区和排序处理,每个分区会形成一个单独的临时文件。

5.Combine阶段:将写入本地磁盘的所有临时文件合并(Merge)成一个新的文件,对新文件进行归并排序。

3.ReduceTask工作原理

ReduceTask的工作过程主要经历Copy阶段、Merge阶段、Reduce阶段和Write阶段。

在这里插入图片描述

1.Copy阶段:从不同的MapTask复制需要处理的数据,将数据写入内存缓冲区。

2.Merge阶段:判断内存缓冲区中的数据是否达到指定阈值。当数据达到指定阈值时,会将内存缓冲区中不同MapTask获取的数据合并,合并的同时会进行排序操作,合并的结果会写入本地磁盘,形成临时文件。在合并的过程中,在本地磁盘还会产生多个临时文件,多个临时文件在传输到Reduce阶段之前会再进行一次合并和排序操作。

3.Reduce阶段:根据实际应用场景自定义reduce()方法,对Merge阶段输出的键值对进行处理。Merge阶段以组的形式将数据传输到Reduce阶段,每组数据中键值对的键相同,每组数据会调用一次reduce()方法进行处理,并生成新的键值对。

4.Write阶段:将Reduce阶段生成的新键值对输出,例如HDFS。

4.Shuffle工作原理

Shuffle是MapReduce的核心,它用来确保每个ReduceTask的输入数据都是按键排序的。它的性能高低直接决定了整个MapReduce程序的性能高低。

在这里插入图片描述
由上图可知,MapTask和ReduceTask都涉及Shuffle。

1.MapTask中的Shuffle

MapTask中的Shuffle是从Collect阶段到Combine阶段,介绍如下。

(1)MapTask处理的结果会暂时放入一个内存缓冲区,该缓冲区的默认大小是100MB,当缓冲区中的数据达到80%,即80MB时,会在本地文件系统创建一个临时文件,将内存缓冲区中的数据写入这个文件。
(2)写入磁盘之前MapReduce会根据ReduceTask的数量将数据分区,一个ReduceTask对应一个分区,这样做是为了避免有些ReduceTask分配到大量数据,而有些ReduceTask分配到很少的数据,甚至没有分配到数据的现象。
(3)对每个分区的数据进行排序,如果此时设置了Combiner,则会对排序后的结果进行合并,这样做的目的是尽可能减少写入磁盘的数据。
(4)当MapTask结束时可能有很多临时文件,这时需要将这些临时文件合并成一个已分区且已排序的文件,目的是减少ReduceTask的Copy阶段复制数据的数据量
(5)将分区中的数据输出到对应的ReduceTask。

2.ReduceTask中的Shuffle

ReduceTask中的Shuffle是从Copy阶段到Sort阶段,介绍如下。
(1)Copy阶段从不同MapTask处理结果的对应分区中复制数据,并将数据写入不同的内存缓冲区。
(2)当不同内存缓冲区中的数据达到指定阈值时,对不同内存缓冲区中的数据进行合并和排序,并写入磁盘形成临时文件。
(3)随着临时文件的增多,还会将这些临时文件合并排序,也生成一个新的文件。


http://www.ppmy.cn/ops/152346.html

相关文章

Linux内核编程(二十一)USB驱动开发-键盘驱动

一、驱动类型 USB 驱动开发主要分为两种&#xff1a;主机侧的驱动程序和设备侧的驱动程序。一般我们编写的都是主机侧的USB驱动程序。 主机侧驱动程序用于控制插入到主机中的 USB 设备&#xff0c;而设备侧驱动程序则负责控制 USB 设备如何与主机通信。由于设备侧驱动程序通常与…

redis 5.0版本和Redis 7.0.15的区别在哪里

Redis 5.0 和 Redis 7.0.15 之间存在多方面的区别&#xff0c;以下是主要差异点&#xff1a; 1. 新特性与功能 Redis 5.0&#xff1a; 引入了 Stream 数据类型&#xff0c;用于高性能、持久化和实时处理的数据流。 支持 客户端缓存&#xff0c;减少对服务器的请求&#xff0c…

kafka学习笔记6 ACL权限 —— 筑梦之路

在Kafka中&#xff0c;ACL&#xff08;Access Control List&#xff09;是用来控制谁可以访问Kafka资源&#xff08;如主题、消费者组等&#xff09;的权限机制。ACL配置基于Kafka的kafka-acls.sh工具&#xff0c;能够管理对资源的读取、写入等操作权限。 ACL介绍 Kafka的ACL是…

第17章:Python TDD回顾与总结货币类开发

写在前面 这本书是我们老板推荐过的&#xff0c;我在《价值心法》的推荐书单里也看到了它。用了一段时间 Cursor 软件后&#xff0c;我突然思考&#xff0c;对于测试开发工程师来说&#xff0c;什么才更有价值呢&#xff1f;如何让 AI 工具更好地辅助自己写代码&#xff0c;或许…

Grafana系列之面板接入Prometheus Alertmanager

关于Grafana的仪表板Dashboard&#xff0c;以及面板Panel&#xff0c;参考Grafana系列之Dashboard。可以直接在面板上创建Alert&#xff0c;即所谓的Grafana Alert&#xff0c;参考Grafana系列之Grafana Alert。除了Grafana Alert外&#xff0c;面板也可接入Prometheus Alertma…

【EdgeAI实战】(1)STM32 边缘 AI 生态系统

【EdgeAI实战】&#xff08;1&#xff09;STM32 边缘 AI 生态系统 【EdgeAI实战】&#xff08;1&#xff09;STM32 边缘 AI 生态系统 1. STM32 边缘人工智能1.1 X-CUBE-AI 扩展包1.2 STM32 AI Model Zoo1.3 ST AIoT Craft 2. STM32N6 AI 生态系统 (STM32N6-AI)2.1 STM32N6 AI 产…

2、ansible的playbook

ansible的脚本&#xff1a;playbook剧本 脚本的作用&#xff1a;复用 playbook的组成部分 1、开头 ---&#xff1a;表示是一个yaml文件&#xff0c;但是可以忽略。 2、Tasks&#xff08;任务&#xff09;&#xff1a;包含了目标主机上执行的操作&#xff0c;操作还是由模板来…

goland map学习-实践使用练习:判断存在及遍历

对于数据&#xff1a; type Person struct {Address stringAge intJob stringName string }type People map[string]Personvar per People{"1": Person{Address: "1",Age: 1,Job: "1",Name: "1",},"2&quo…