MapReduce

embedded/2024/11/14 3:00:39/

MapReduce的简介

  1. MapReduce的起源:MapReduce是由Google发表的论文提出的,最初是作为Google文件系统(GFS)的一部分。后来,这个概念被Doug Cutting引入到Hadoop项目中,形成了Hadoop分布式文件系统(HDFS)和MapReduce计算模型。

    应用举例:Google的MapReduce被用于大规模数据集的并行处理,例如网页索引和搜索算法。在Hadoop中,MapReduce用于处理和生成大规模数据集,如日志文件分析、社交网络数据分析等。

  2. MapReduce的原语:MapReduce包含两个主要的原语,即Map和Reduce。Map负责将输入数据映射成键值对,而Reduce则对具有相同键的值进行汇总。

    应用举例:在网页内容分析中,Map阶段可以提取每个网页的关键词,而Reduce阶段则可以统计每个关键词在整个数据集中出现的频率。

  3. MapReduce的工作流程:MapReduce的工作流程包括作业的提交、Map任务的执行、中间数据的Shuffle(洗牌)、Reduce任务的执行以及最终结果的输出。

    应用举例:在处理大规模的日志数据时,MapReduce的工作流程可以高效地完成数据的预处理、排序、聚合等操作。

  4. MapReduce的高可用性:文档提到了YARN(Yet Another Resource Negotiator)的ResourceManager的高可用性,这是Hadoop生态系统中用于资源管理和作业调度的组件。

    应用举例:在构建大型数据仓库时,YARN的高可用性确保了数据处理作业的连续性和稳定性,即使在节点故障的情况下也能保证数据处理的进行。

MapReduce的工作流程

主要知识点:

  1. MapReduce原语:确保相同key的数据被分配到同一个reduce任务中进行处理。

  2. 数据分块(Block):HDFS中文件被分割成多个block,每个block默认对应一个split,但可以灵活处理,一个block可以对应多个split。

  3. Map任务与Split的关系:通常情况下,一个split对应一个map任务。

  4. 执行顺序:Map任务完成后,才开始执行Reduce任务,以确保所有Map输出都可用于Reduce。

  5. 数据分区(Partition):Map输出的数据根据key进行分区,每个分区对应一个Reduce任务。分区后,数据按照key的字典顺序进行排序

  6. MapReduce的分区和排序:在Map阶段输出数据后,会进行分区(Partition)和排序(Sort),以便在Reduce阶段能够对相同键的数据进行有效处理。

    应用举例:在处理用户行为数据时,Map阶段可以按照用户ID进行分区,然后对每个用户的行为数据进行排序,以便Reduce阶段能够按用户汇总数据。

  7. Reduce任务:对每个分区的数据进行汇总和处理,通常一个分区对应一个Reduce任务,但也可以实现多个分组对一个Reduce任务。Reduce任务只拷贝属于自己分区的数据

  8. Combiner:在Map端进行数据的预合并,以减少传输到Reduce端的数据量。

  9. 缓冲区(Circular Buffer):Map输出数据首先写入环形缓冲区,当达到一定阈值后,数据被写入磁盘。

  10. Map任务和缓冲区

    • Map任务处理输入切片,并将处理结果写入环形缓冲区。
    • 缓冲区大小默认为100MB,阈值设为80%,即80MB。
  11. 排序:在写入磁盘前,数据会根据key进行排序,如果设置了Combiner,则会先执行Combiner。

  12. 文件合并:小文件会根据分区号和key进行合并,减少磁盘I/O。

  13. 文件合并

    • 当环形缓冲区满时,数据会被写入磁盘形成溢出文件。
    • 如果有两个或更少的溢出文件,且没有设置Combiner,则不会执行合并操作。
    • 如果溢出文件大于或等于三个,会执行合并操作,如果设置了Combiner,则会执行Combiner操作。
  14. Shuffle过程:Map输出数据被传输到Reduce端的过程,包括拷贝、排序和合并。

  15. 内存与磁盘:MapReduce会根据内存和磁盘的性能差异,优化数据处理过程。

实际应用举例:WordCount程序

假设我们有一个文本文件,我们想要统计文件中每个单词出现的次数。以下是如何将上述知识点应用到WordCount程序中:

  1. Map阶段

    • 读取文本文件,将其分割成多个block。
    • 每个block对应一个split,启动一个map任务。
    • Map任务将每一行文本拆分成单词,生成键值对(单词,1),并写入环形缓冲区。
  2. Partition阶段

    • Map输出的数据根据单词(key)进行分区。
  3. Shuffle过程

    • 当环形缓冲区达到阈值时,数据被写入磁盘,并根据分区号进行排序。
    • 如果设置了Combiner,Map端会预先合并相同单词的计数。
  4. Reduce阶段

    • 每个分区的数据被传输到对应的Reduce任务。
    • Reduce任务接收到数据后,对同一分区内的单词进行汇总,生成最终的单词计数。
  5. 优化

    • 通过设置合并因子,减少磁盘I/O,优化数据处理过程。
    • 使用Combiner减少传输到Reduce端的数据量。

通过这个过程,WordCount程序能够高效地统计出文本文件中每个单词的出现次数,并将结果输出到一个新的文件中。

资源管理与调度架构

  1. 资源管理与调度架构的角色

    • Resource Manager (资源管理器):负责整个集群资源的管理和调度。
    • Node Manager (节点管理器):在每个节点上运行,管理该节点的资源使用情况和任务执行。
  2. 作业执行流程

    • 客户端向 Resource Manager 发起作业执行申请。
    • Resource Manager 根据 Node Manager 汇报的资源使用情况,决定在哪个节点上启动 Application Master。
    • Application Master 负责该作业的任务调度、监控和容错。
  3. 单点故障与高可用性

    • Resource Manager 存在单点故障问题,可以通过设置多个节点实现高可用性。
  4. 资源分配与监控

    • Node Manager 监控容器资源使用情况,如超出申请资源,相关进程会被终止。
  5. YARN 架构

    • YARN (Yet Another Resource Negotiator) 是 Hadoop 2.0 引入的资源管理系统,将资源管理和任务调度功能分离。
    • ResourceManager 负责集群资源的管理和调度。
    • NodeManager 上的 Application Master 负责特定应用程序的任务调度和监控。
  6. MapReduce 在 YARN 上的执行

    • MapReduce 作业直接在 YARN 上运行,由 Application Master 负责任务的切分、调度、监控和容错。
    • 每个 MR 作业对应一个 MR Application Master。
  7. 任务执行与容错

    • 如果 Application Master 失败,YARN 会重新启动它,并重新申请资源继续执行。

实际应用举例说明:

假设有一个大数据分析公司,他们需要处理大量的数据日志来分析用户行为。他们使用 Hadoop 2.0 集群来进行数据处理。

  1. 资源管理:公司的数据科学家提交了一个 MapReduce 作业到集群,Resource Manager 根据当前集群的资源使用情况,决定在哪个节点上启动 Application Master。

  2. 任务调度:Application Master 启动后,它将 MapReduce 作业分解为多个任务(map 和 reduce 任务),并将这些任务调度到不同的 Node Manager 上执行。

  3. 监控与容错:在任务执行过程中,Node Manager 监控资源使用情况,确保任务不会超出资源限制。如果某个任务失败,Application Master 会重新调度该任务到其他节点上。

  4. 高可用性:如果 Resource Manager 发生故障,备用的 Resource Manager 节点会接管,保证集群资源管理的连续性。

  5. 多作业并行处理:YARN 架构允许多个 MapReduce 作业同时在集群上运行,每个作业都有自己的 Application Master,互不干扰。

通过这种方式,公司能够有效地管理和调度集群资源,同时确保作业的高可用性和容错性,从而提高数据处理的效率和可靠性。

MapReduce 执行流程笔记

1. 流程概述

  • 目的: 分析 MR (MapReduce) 作业的执行流程。
  • 流程维度: 从节点、状态节点、角色进程等角度进行分析。

2. 角色与节点

  • 客户端 (Client): 启动 MR 作业的地方。
  • Resource Manager: 管理资源分配的节点。
  • Node Managers: 多个节点,负责执行任务。

3. 作业提交与监控

  • 作业对象创建: 客户端创建 Job 对象。
  • 进度监控: 通过 waitForCompletion() 方法每秒轮询作业进度,并输出到控制台或日志。

4. 作业详细提交步骤

  • 获取作业 ID: 向 Resource Manager 申请。
  • 输出检查与分片计算: 计算作业分片,上传作业包、配置文件到 HDFS。
  • 作业提交: 通过 submitApplication() 方法提交作业给 Resource Manager。

5. 作业执行

  • 容器分配: Resource Manager 分配容器,选择空闲 Node Manager 节点。
  • MR AppMaster 启动: 在 Node Manager 上启动 MR AppMaster。
  • 作业初始化: AppMaster 初始化作业,跟踪作业进度。

6. 任务执行

  • Map & Reduce 任务: 根据 HDFS 分片信息创建任务。
  • 资源申请: 为 Map 和 Reduce 任务申请资源容器。
  • 任务启动: Map 任务先启动,Reduce 任务在 Map 完成一定量后启动。
  • 数据拷贝与合并: Reduce 任务从 Map 任务节点拷贝数据,进行合并和分组。

7. 进度反馈与作业完成

  • 进度反馈: 完成情况反馈给 AppMaster,再传给客户端。
  • 结果输出: Reduce 任务输出文件到 HDFS。

8. Yarn Child 类

  • 资源本地化: Yarn Child 类从 HDFS 获取作业相关资源到本地。
  • 任务运行: 在本地运行 Map 和 Reduce 任务。

学习建议

  • 源码学习: 通过后续的源码讲解深入理解各方法和对象的作用。
  • 流程细节: 查看文档中标注的方法和属性,加深对执行流程的理解。

高可用的ResourceManager(实践待完善)

  1. 搭建HDFS高可用:首先,你需要搭建一个高可用的HDFS集群,确保数据存储的可靠性。

  2. 配置ResourceManager:在两台服务器上配置ResourceManager,一台作为active,另一台作为standby,通过ZooKeeper来管理它们的切换。

  3. 编辑配置文件

    1. mapred-site.xml

    此文件用于配置MapReduce框架。

    <configuration><!-- 指定MapReduce作业由YARN管理 --><property><name>mapreduce.framework.name</name><value>yarn</value></property>
    </configuration>
    
    1. yarn-site.xml

    此文件用于配置YARN(包括ResourceManager和NodeManager)。

    <configuration><!-- YARN支持MapReduce作业的suffer服务 --><property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property><!-- 启用ResourceManager的高可用性 --><property><name>yarn.resourcemanager.ha.enabled</name><value>true</value></property><!-- 指定ZooKeeper集群的地址 --><property><name>yarn.resourcemanager.zk-address</name><value>node2:2181,node3:2181,node4:2181</value></property><!-- 设置集群ID --><property><name>yarn.resourcemanager.cluster-id</name><value>cluster1</value></property><!-- 指定两个ResourceManager的名称和对应的节点 --><property><name>yarn.resourcemanager.ha.rm-ids</name><value>rm1,rm2</value></property><property><name>yarn.resourcemanager.hostname.rm1</name><value>node3</value></property><property><name>yarn.resourcemanager.hostname.rm2</name><value>node4</value></property>
    </configuration>
    

    说明

    • <configuration>标签内包含所有的配置项。
    • 每个配置项使用<property>标签,包含<name><value>子标签。
    • mapreduce.framework.name设置为yarn,表示MapReduce作业由YARN管理。
    • yarn.nodemanager.aux-services设置为mapreduce_shuffle,表示NodeManager支持MapReduce的shuffle服务。
    • yarn.resourcemanager.ha.enabled设置为true,启用ResourceManager的高可用性。
    • yarn.resourcemanager.zk-address指定了ZooKeeper集群的地址。
    • yarn.resourcemanager.cluster-id设置集群ID。
    • yarn.resourcemanager.ha.rm-ids和相关的hostname属性指定了两个ResourceManager的ID和对应的节点。
      这些配置文件需要放置在Hadoop配置目录下(通常是$HADOOP_HOME/etc/hadoop)。在应用这些更改后,通常需要重新启动YARN服务以使配置生效。
  4. 分发配置文件:将配置好的mapred-site.xmlyarn-site.xml文件分发到所有相关节点。

  5. 启动集群:编写并执行脚本来启动HDFS和ResourceManager。

  6. 测试高可用性:通过停止active的ResourceManager,验证standby的ResourceManager能否自动切换为active状态。

  7. 监控与维护:使用ZooKeeper的命令行工具来监控ResourceManager的状态,确保集群的稳定运行。

高可用集群脚本编写

1. 脚本编写目的

  • 目标: 编写高可用Hadoop集群的启动和关闭脚本,简化操作流程。

2. 脚本编写过程

  • 启动脚本编写:
    • 脚本命名:startHA.sh
    • 主要内容:
      • 循环遍历节点(NODE 3, NODE 4)。
      • 在每个节点上执行source /etc/profile以加载环境变量。
      • 启动zookeeper服务。
      • 休眠1秒,以确保服务完全启动。
      • 调用start-hdfs.sh脚本启动HDFS。
      • 调用start-yarn.sh脚本启动YARN资源管理器。
      • 检查进程状态,确保服务正常启动。
  • 关闭脚本编写:
    • 脚本命名:stopHA.sh
    • 主要内容:
      • 关闭YARN资源管理器。
      • 关闭HDFS服务。
      • 循环遍历节点,关闭所有ZooKeeper服务。

启动脚本(startHA.sh)

#!/bin/bash
# 启动Hadoop集群的HDFS和YARN服务
# 循环遍历节点
for NODE in NODE3 NODE4
do# 在每个节点上执行source命令以加载环境变量ssh $NODE "source /etc/profile"# 启动ZooKeeper服务ssh $NODE "zookeeper-server.sh start"# 等待1秒以确保服务完全启动sleep 1# 调用start-hdfs.sh脚本启动HDFSssh $NODE "hdfs --daemon start namenode"# 调用start-yarn.sh脚本启动YARN资源管理器ssh $NODE "yarn --daemon start resourcemanager"
done
# 检查进程状态,确保服务正常启动
ssh NODE3 "jps"
ssh NODE4 "jps"

关闭脚本(stopHA.sh)

#!/bin/bash
# 关闭Hadoop集群的HDFS和YARN服务
# 关闭YARN资源管理器
ssh NODE4 "yarn --daemon stop resourcemanager"
# 关闭HDFS服务
ssh NODE4 "hdfs --daemon stop namenode"
# 循环遍历节点
for NODE in NODE3 NODE4
do# 关闭ZooKeeper服务ssh $NODE "zookeeper-server.sh stop"
done
# 检查进程状态,确保服务已关闭
ssh NODE3 "jps"
ssh NODE4 "jps"

具体案例:Word Count 程序操作步骤及代码

1. 文件准备

  • 打开终端。
  • 使用 VIM 创建新文件并编辑:
    vim word_count.test
    
  • 在 VIM 中输入文本,例如:
    Hello world
    Hello Hadoop
    
  • 保存并退出 VIM:按下 Esc 键,输入 :wq,然后按 Enter

2. HDFS 操作

  • 在 HDFS 上创建目录:
    hdfs dfs -mkdir word_count_input
    
  • 将文件上传到 HDFS:
    hdfs dfs -put word_count.test word_count_input/
    

3. 执行 Word Count

  • 切换到 MapReduce 示例目录(根据实际环境调整):
    cd /path/to/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-<version>.jar
    
  • 执行 MapReduce 任务:
    hadoop jar hadoop-mapreduce-examples-<version>.jar wordcount word_count_input word_count_output
    

4. 查看结果

  • 查看输出目录:
    hdfs dfs -ls word_count_output
    
  • 查看结果文件:
    hdfs dfs -cat word_count_output/part-r-00000
    

5. 结果分析

  • 查看输出的结果文件,例如:
    Hello	2
    Hadoop	1
    World	1
    

学习建议

  • 实践上述步骤,熟悉每个命令的作用。
  • 尝试修改输入文本和文件名,观察输出结果的变化。
  • 了解 MapReduce 的工作原理,深入研究 Word Count 程序的源代码。

Word Count 的 Java 程序

实现步骤

  1. 创建 Map 类:用于处理输入的数据,将每个单词映射为 (word, 1)
  2. 创建 Reduce 类:用于将 Map 阶段的输出进行汇总,统计每个单词的数量。
  3. 配置 Job:设置输入输出路径,指定 Map 和 Reduce 类。
  4. 运行 Job:提交作业到 Hadoop 集群执行。

Java 代码示例

Map 类
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();for (String token : line.split("\\s+")) {word.set(token);context.write(word, one);}}
}
Reduce 类
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}
}
配置和运行 Job
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(WordCountMapper.class);job.setCombinerClass(WordCountReducer.class);job.setReducerClass(WordCountReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

运行程序

编译并打包这个 Java 程序,然后使用 Hadoop 命令来运行它:

hadoop jar wordcount.jar WordCount /input /output

这里 /input 是输入文件的路径,/output 是输出结果的路径。

学习建议

  • 理解 MapReduce 的工作原理。
  • 练习编写 Map 和 Reduce 函数。
  • 熟悉 Hadoop 的命令行操作。

http://www.ppmy.cn/embedded/27627.html

相关文章

QQ游戏怎么在电脑上玩 最新图解Mac电脑运行QQ游戏教程 苹果电脑qq打不开怎么办 mac电脑安装windows

很多常用的软件都不提供Mac版本&#xff0c;比如&#xff1a;QQ游戏&#xff0c;大型游戏等。即使是在官网下载了软件安装包&#xff0c;下载的是exe格式。Mac系统无法直接打开&#xff0c;但是可以借助CrossOver打开exe文件。 作为mac上一款强大的windows虚拟机&#xff0c;C…

如何将API 中的excel 文件load 到 Azure blob 中

背景&#xff1a; 项目中一个API 中的excel 加密了&#xff0c;需要完成以下任务&#xff1a; 任务一&#xff1a; 将这个excel 文件从API 获取&#xff0c;解密后将数据load 进SQL SERVER 数据库 任务二&#xff1a;将这个excel 文件从API 获取后&#xff0c;解密后将数据ex…

设置UIProgressView的样式

UIProgressView是UIKit框架中的一个控件&#xff0c;用于显示任务的完成进度。你可以使用UIProgressView来展示任务的完成情况&#xff0c;例如下载文件、上传数据等。 要设置UIProgressView的样式&#xff0c;包括粗细、颜色等&#xff0c;你可以使用UIProgressView的一些属性…

Allegro orcad16.6 层叠,阻抗,差分,规则设置

PCB布局结束 再布线之用前要对pcb进行叠层处理和设计规则添加 添加4层 确定&#xff01; 改下颜色 阻抗计算 嘉立创阻抗计算 (jlc.com) 输入到规则管理器 添加完对这个网络进行驱动 添加差分对 设置好之后回到规则设置 设置DIFF100 物理规则设置完 再添加一个间距规则 设置…

多线程常见使用

Thread.currentThread().interrupt(); Thread.currentThread().interrupt(); 是Java中用于中断当前线程的一个方法调用。这一行代码的作用是设置当前正在执行这段代码的线程的中断标志位为true。在Java中&#xff0c;中断是一种协作机制&#xff0c;用于通知线程应该停止它正在…

中兴F7607P自启动程序,关闭JAVA插件

中兴F7607P自启动程序&#xff0c;关闭JAVA插件 本文目的&#xff1a;关闭光猫内自动运行的JAVA插件&#xff0c;并实现开机自动调用用户的程序启动 原文地址 移动定制版F7607P不带LXC容器&#xff0c;取而代之的是JAVA虚拟机&#xff0c;内置多个插件&#xff0c;包括名为C…

Linux系统中搭建Mosquitto MQTT服务并实现远程访问本地消息代理进行通信

文章目录 1. Linux 搭建 Mosquitto2. Linux 安装Cpolar3. 创建MQTT服务公网连接地址4. 客户端远程连接MQTT服务5. 代码调用MQTT服务6. 固定连接TCP公网地址7. 固定地址连接测试 今天和大家分享一下如何在Linux系统中搭建Mosquitto MQTT协议消息服务端,并结合Cpolar内网穿透工具…

websocket集成文档

1.添加依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId> </dependency>2.添加配置 Configuration public class WebSocketConfig {Beanpublic ServerEndpointExpo…