Hadoop中MapReduce过程中Shuffle过程实现自定义排序

server/2024/12/29 10:09:20/

文章目录

  • Hadoop中MapReduce过程中Shuffle过程实现自定义排序
    • 一、引言
    • 二、实现WritableComparable接口
      • 1、自定义Key类
    • 三、使用Job.setSortComparatorClass方法
      • 2、设置自定义排序器
      • 3、自定义排序器类
    • 四、使用示例
    • 五、总结

Hadoop中MapReduce过程中Shuffle过程实现自定义排序

在这里插入图片描述

一、引言

MapReduce框架中的Shuffle过程是连接Map阶段和Reduce阶段的桥梁,负责将Map任务的输出结果按照key进行分组和排序,并将相同key的数据传递给对应的Reduce任务进行处理。Shuffle过程的性能直接影响到整个MapReduce作业的执行效率。在默认情况下,Hadoop使用TotalOrderPartitioner进行排序,但有时我们需要根据特定的业务逻辑进行自定义排序。本文将介绍两种方法来实现自定义排序:实现WritableComparable接口和使用Job.setSortComparatorClass方法。下面是详细的步骤和代码示例。

二、实现WritableComparable接口

1、自定义Key类

首先,我们需要定义一个类并实现WritableComparable接口,该接口要求实现compareTo方法,用于定义排序逻辑。

package mr;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class Employee implements WritableComparable<Employee> {private int empno;private String ename;private String job;private int mgr;private String hiredate;private int sal;private int comm;private int deptno;@Overridepublic String toString(){return "Employee[empno="+empno+",ename="+ename+",sal="+sal+",deptno="+deptno+"]";}@Overridepublic int compareTo(Employee o) {// 多个列的排序:select * from emp order by deptno,sal;// 首先按照deptno排序if(this.deptno > o.getDeptno()){return 1;}else if(this.deptno < o.getDeptno()){return -1;}// 如果deptno相等,按照sal排序if(this.sal >= o.getSal()){return 1;}else{return -1;}}@Overridepublic void write(DataOutput output) throws IOException {// 序列化output.writeInt(this.empno);output.writeUTF(this.ename);output.writeUTF(this.job);output.writeInt(this.mgr);output.writeUTF(this.hiredate);output.writeInt(this.sal);output.writeInt(this.comm);output.writeInt(this.deptno);}@Overridepublic void readFields(DataInput input) throws IOException {// 反序列化this.empno = input.readInt();this.ename = input.readUTF();this.job = input.readUTF();this.mgr = input.readInt();this.hiredate = input.readUTF();this.sal = input.readInt();this.comm = input.readInt();this.deptno = input.readInt();}
}

三、使用Job.setSortComparatorClass方法

2、设置自定义排序器

除了实现WritableComparable接口外,我们还可以使用Job.setSortComparatorClass方法来设置自定义排序器。这种方法允许我们在不修改Key类的情况下实现自定义排序。

package mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class CustomSort {public static class Map extends Mapper<Object, Text, Employee, IntWritable> {private static Employee emp = new Employee();private static IntWritable one = new IntWritable(1);@Overrideprotected void map(Object key, Text value, Context context) throws IOException, InterruptedException {String[] line = value.toString().split("\t");emp.setEmpno(Integer.parseInt(line[0]));emp.setEname(line[1]);emp.setJob(line[2]);emp.setMgr(Integer.parseInt(line[3]));emp.setHiredate(line[4]);emp.setSal(Integer.parseInt(line[5]));emp.setComm(Integer.parseInt(line[6]));emp.setDeptno(Integer.parseInt(line[7]));context.write(emp, one);}}public static class Reduce extends Reducer<Employee, IntWritable, Employee, IntWritable> {@Overrideprotected void reduce(Employee key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {for (IntWritable val : values) {context.write(key, val);}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf, "CustomSort");job.setJarByClass(CustomSort.class);job.setMapperClass(Map.class);job.setReducerClass(Reduce.class);job.setOutputKeyClass(Employee.class);job.setOutputValueClass(IntWritable.class);// 设置自定义排序器job.setSortComparatorClass(EmployeeComparator.class);Path in = new Path("hdfs://localhost:9000/mr/in/customsort");Path out = new Path("hdfs://localhost:9000/mr/out/customsort");FileInputFormat.addInputPath(job, in);FileOutputFormat.setOutputPath(job, out);System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

3、自定义排序器类

package mr;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;public class EmployeeComparator extends WritableComparator {protected EmployeeComparator() {super(Employee.class, true);}@Overridepublic int compare(WritableComparable w1, WritableComparable w2) {Employee e1 = (Employee) w1;Employee e2 = (Employee) w2;// 首先按照deptno排序int deptCompare = Integer.compare(e1.getDeptno(), e2.getDeptno());if (deptCompare != 0) {return deptCompare;}// 如果deptno相等,按照sal排序return Integer.compare(e1.getSal(), e2.getSal());}
}

四、使用示例

下面是一个简单的MapReduce示例,展示了Shuffle过程在实际应用中的使用。这个示例中,我们使用了自定义的Employee类作为Key,并设置了自定义的排序器EmployeeComparator

五、总结

通过实现WritableComparable接口和使用Job.setSortComparatorClass方法,我们可以在Hadoop MapReduce过程中实现自定义排序。这两种方法提供了灵活的排序机制,允许我们根据不同的业务需求对数据进行排序处理,从而提高数据处理的效率和准确性。


版权声明:本博客内容为原创,转载请保留原文链接及作者信息。

参考文章

  • Hadoop之mapreduce数据排序案例(详细代码)
  • Java Job.setSortComparatorClass方法代码示例

http://www.ppmy.cn/server/154166.html

相关文章

服务器如何划分空间?

服务器如何划分空间&#xff1f;服务器是存储和处理数据的核心&#xff0c;如何有效地划分服务器空间则直接关系到资源的利用效率和系统的性能。无论是大型企业的数据中心&#xff0c;还是小型网站的共享主机&#xff0c;合理的空间划分都至关重要。下面是聚名网关于服务器如何…

丢失的MD5

丢失的MD5 源代码&#xff1a; import hashlib for i in range(32,127):for j in range(32,127):for k in range(32,127):mhashlib.md5()m.update(TASCchr(i)O3RJMVchr(j)WDJKXchr(k)ZM)desm.hexdigest()if e9032 in des and da in des and 911513 in des:print des 发现给…

Qt之数据库使用(十四)

Qt开发 系列文章 - QSqlDatabase-SQLite&#xff08;十四&#xff09; 目录 前言 一、SQLite 二、SQLite使用 1.添加SQL 2.创建数据库 3.定义类及相关变量 4.相关功能函数 5.用户类定义 6.用户类使用 7.效果演示 8.SQLite数据库 总结 前言 ‌Qt支持的数据库包括SQ…

如何在 Spring Boot 微服务中设置和管理多个数据库

在现代微服务架构中&#xff0c;通常需要与多个数据库交互的服务。这可能是由于各种原因&#xff0c;例如遗留系统集成、不同类型的数据存储需求&#xff0c;或者仅仅是为了优化性能。Spring Boot 具有灵活的配置和强大的数据访问库&#xff0c;可以轻松配置多个数据库。在本综…

Linux top指令

top指令概述 top 是 Linux 系统中用于实时监控系统性能和进程信息的命令&#xff0c;功能强大且灵活。它提供了系统资源的动态视图&#xff0c;包括 CPU、内存、运行中的进程等。 这个指令可以说是Linux中最基本的工具了&#xff0c;用来监视系统的实时运行状态&#xff0c;类…

算法

探索算法世界&#xff1a;从基础到前沿 一、引言 算法是计算机科学的核心&#xff0c;它为解决各种问题提供了明确的步骤和方法。无论是数据处理、人工智能还是日常软件应用&#xff0c;算法都起着关键作用。 二、基础算法 排序算法 排序算法是最常见的算法之一。例如冒泡排序&…

Rtsplive-视频流-Linux部署

系统环境&#xff1a;Ubuntu-24.04-server JDK环境&#xff1a;≥java17 一、部署rtsplive 上传rtsplive-ubuntu-x64.tar.gz至linux-Ubuntu-24.04服务器 并解压至/opt目录下 二、安装JDK 使用Java 命令&#xff0c;检测是否有安装java&#xff0c;是否大于17版本 如系统安…

遗传算法——附python实现与各方法的作用与修改

前言 遗传算法是数学建模中非常重要的一种搜索和优化算法&#xff0c;掌握遗传算法的精髓除了在竞赛中具有优势以外&#xff0c;更主要的是在解决实际问题的时候提供了一种全新的思路&#xff0c;通过将现实中的某种模式转换成算法&#xff0c;并用以解决某种问题的这种思路&a…