文章目录 一、任务目标 二、实行任务 1. 创建Maven项目 2. 添加相关依赖 3. 创建日志属性文件 4. 创建学生实体类 5. 创建学生映射器类 6. 创建学生归并器类 7. 创建学生驱动类 8. 启动学生驱动器类,查看结果
一、任务目标
1. 准备数据
创建sortstudent目录,在里面创建student.txt文件 创建/sortstudent/input目录,执行命令:hdfs dfs -mkdir -p /sortstudent/input
将文本文件student.txt,上传到HDFS的/sortstudent/input目录
二、实行任务
1. 创建Maven项目
Maven项目 - SortStudent
2. 添加相关依赖
在pom.xml文件里添加hadoop和junit依赖
< dependencies> < dependency> < groupId> org.apache.hadoop</ groupId> < artifactId> hadoop-client</ artifactId> < version> 3.3.4</ version> </ dependency> < dependency> < groupId> junit</ groupId> < artifactId> junit</ artifactId> < version> 4.13.2</ version> </ dependency>
</ dependencies>
3. 创建日志属性文件
在resources目录里创建log4j.properties文件
log4j.rootLogger = ERROR, stdout, logfile
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = %d %p [ %c] - %m%n
log4j.appender.logfile = org.apache.log4j.FileAppender
log4j.appender.logfile.File = target/sortstudent.log
log4j.appender.logfile.layout = org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern = %d %p [ %c] - %m%n
4. 创建学生实体类
package net. kox. mr ; import org. apache. hadoop. io. WritableComparable ; import java. io. DataInput ;
import java. io. DataOutput ;
import java. io. IOException ; public class Student implements WritableComparable < Student > { private String name; private String gender; private int age; private String phone; private String major; public String getName ( ) { return name; } public void setName ( String name) { this . name = name; } public String getGender ( ) { return gender; } public void setGender ( String gender) { this . gender = gender; } public int getAge ( ) { return age; } public void setAge ( int age) { this . age = age; } public String getPhone ( ) { return phone; } public void setPhone ( String phone) { this . phone = phone; } public String getMajor ( ) { return major; } public void setMajor ( String major) { this . major = major; } @Override public String toString ( ) { return "Student{" + "name='" + name + '\'' + ", gender='" + gender + '\'' + ", age=" + age + ", phone='" + phone + '\'' + ", major='" + major + '\'' + '}' ; } public int compareTo ( Student o) { return o. getAge ( ) - this . getAge ( ) ; } public void write ( DataOutput out) throws IOException { out. writeUTF ( name) ; out. writeUTF ( gender) ; out. writeInt ( age) ; out. writeUTF ( phone) ; out. writeUTF ( major) ; } public void readFields ( DataInput in) throws IOException { name = in. readUTF ( ) ; gender = in. readUTF ( ) ; age = in. readInt ( ) ; phone = in. readUTF ( ) ; major = in. readUTF ( ) ; }
}
5. 创建学生映射器类
在net.kox.mr里创建StudentMapper类
package net. kox. mr ; import org. apache. hadoop. io. LongWritable ;
import org. apache. hadoop. io. NullWritable ;
import org. apache. hadoop. io. Text ;
import org. apache. hadoop. mapreduce. Mapper ; import java. io. IOException ; public class StudentMapper extends Mapper < LongWritable , Text , Student , NullWritable > { @Override protected void map ( LongWritable key, Text value, Context context) throws IOException , InterruptedException { String line = value. toString ( ) ; String [ ] fields = line. split ( " " ) ; String name = fields[ 0 ] ; String gender = fields[ 1 ] ; int age = Integer . parseInt ( fields[ 2 ] ) ; String phone = fields[ 3 ] ; String major = fields[ 4 ] ; Student student = new Student ( ) ; student. setName ( name) ; student. setGender ( gender) ; student. setAge ( age) ; student. setPhone ( phone) ; student. setMajor ( major) ; context. write ( student, NullWritable . get ( ) ) ; }
}
6. 创建学生归并器类
package net. kox. mr ; import org. apache. hadoop. io. NullWritable ;
import org. apache. hadoop. io. Text ;
import org. apache. hadoop. mapreduce. Reducer ; import java. io. IOException ; public class StudentReducer extends Reducer < Student , NullWritable , Text , NullWritable > { @Override protected void reduce ( Student key, Iterable < NullWritable > values, Context context) throws IOException , InterruptedException { Student student = key; String studentInfo = student. getName ( ) + "\t" + student. getGender ( ) + "\t" + student. getAge ( ) + "\t" + student. getPhone ( ) + "\t" + student. getMajor ( ) ; context. write ( new Text ( studentInfo) , NullWritable . get ( ) ) ; }
}
7. 创建学生驱动类
在net.kox.mr包里创建StudentDriver类
package net. kox. mr ; import org. apache. hadoop. conf. Configuration ;
import org. apache. hadoop. fs. FSDataInputStream ;
import org. apache. hadoop. fs. FileStatus ;
import org. apache. hadoop. fs. FileSystem ;
import org. apache. hadoop. fs. Path ;
import org. apache. hadoop. io. IOUtils ;
import org. apache. hadoop. io. NullWritable ;
import org. apache. hadoop. io. Text ;
import org. apache. hadoop. mapreduce. Job ;
import org. apache. hadoop. mapreduce. lib. input. FileInputFormat ;
import org. apache. hadoop. mapreduce. lib. output. FileOutputFormat ; import java. net. URI ; public class StudentDriver { public static void main ( String [ ] args) throws Exception { Configuration conf = new Configuration ( ) ; conf. set ( "dfs.client.use.datanode.hostname" , "true" ) ; Job job = Job . getInstance ( conf) ; job. setJarByClass ( StudentDriver . class ) ; job. setMapperClass ( StudentMapper . class ) ; job. setMapOutputKeyClass ( Student . class ) ; job. setMapOutputValueClass ( NullWritable . class ) ; job. setReducerClass ( StudentReducer . class ) ; job. setOutputKeyClass ( Student . class ) ; job. setOutputValueClass ( NullWritable . class ) ; String uri = "hdfs://master:9000" ; Path inputPath = new Path ( uri + "/sortstudent/input" ) ; Path outputPath = new Path ( uri + "/sortstudent/output" ) ; FileSystem fs = FileSystem . get ( new URI ( uri) , conf) ; fs. delete ( outputPath, true ) ; FileInputFormat . addInputPath ( job, inputPath) ; FileOutputFormat . setOutputPath ( job, outputPath) ; job. waitForCompletion ( true ) ; System . out. println ( "======统计结果======" ) ; FileStatus [ ] fileStatuses = fs. listStatus ( outputPath) ; for ( int i = 1 ; i < fileStatuses. length; i++ ) { System . out. println ( fileStatuses[ i] . getPath ( ) ) ; FSDataInputStream in = fs. open ( fileStatuses[ i] . getPath ( ) ) ; IOUtils . copyBytes ( in, System . out, 4096 , false ) ; } }
}
8. 启动学生驱动器类,查看结果
运行StudentDriver 类 确实学生信息按照年龄降序排列了,但是做了一件我们不需要的去重,少了3条记录 需要修改学生归并器类,遍历值迭代器,这样就不会去重了 再次运行StudentDriver 类