1.partitioner分区
1.概念
1、将map输出的kv对,按照相同key分组,然后分发给不同的reducetask
默认的分发规则为:根据keyhashcode%reducetask
2、一般返回值从0开始,并且定义多少的reduce数,就有多少分区
2.实例
class MyPartition extends Partitioner<Text, IntWritable> {/*** 自定义分区将数字放在0号分区,其余放在1号分区*/@Overridepublic int getPartition(Text key, IntWritable value, int numPartitions) {try {Integer.parseInt(key.toString());return 0;} catch (Exception e) {return 1;}}
}main函数中
//设置分区类
job.setPartitionerClass(MyPartition.class);
//设置分区数量
job.setNumReduceTasks(2);
2.combiner聚合
1.概念
1、Combiner是在每一个maptask所在的节点运行,进行预聚合,减少网络传输量。
2、combiner能够应用的前提是不能影响最终的业务逻辑。而且,combiner的输出kv应该跟reducer的输入kv类型要对应,combiner的输入kv和map的输出kv相同
3、继承reduce类
2.案例
class MyCombiner extends Reducer<Text,Text,Text,Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {int num = 0;Text text = new Text();while (values.iterator().hasNext()){num++;text = values.iterator().next();}context.write(key, new Text(text.toString()+":"+num+""));
}
}main方法中
job.setCombinerClass(MyCombiner.class);
3.分组比较器
1.使用
1、自定义分组规则,将相同规则的放在同一个组中。
例如将student的name属性作为分组的标准。
2、继承writablecomparator
2.案例
public class ItemidGroupingComparator extends WritableComparator {protected ItemidGroupingComparator() {//必须用父类的构造方法super(OrderBean.class, true);}@Override//自定义分组规则public int compare(WritableComparable a, WritableComparable b) {OrderBean abean = (OrderBean) a;OrderBean bbean = (OrderBean) b;//将item_id相同的bean都视为相同,从而聚合为一组return abean.getItemid().compareTo(bbean.getItemid());}}//注意orderbean类的书写
public class OrderBean implements WritableComparable<OrderBean>{private Text itemid;private DoubleWritable amount;public OrderBean() {}public OrderBean(Text itemid, DoubleWritable amount) {set(itemid, amount);}public void set(Text itemid, DoubleWritable amount) {this.itemid = itemid;this.amount = amount;}public Text getItemid() {return itemid;}public DoubleWritable getAmount() {return amount;}@Overridepublic int compareTo(OrderBean o) {int cmp = this.itemid.compareTo(o.getItemid());if (cmp == 0) {cmp = -this.amount.compareTo(o.getAmount());}return cmp;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(itemid.toString());out.writeDouble(amount.get());}@Overridepublic void readFields(DataInput in) throws IOException {String readUTF = in.readUTF();double readDouble = in.readDouble();this.itemid = new Text(readUTF);this.amount= new DoubleWritable(readDouble);}@Overridepublic String toString() {return itemid.toString() + "\t" + amount.get();}
}main方法中
job.setGroupingComparatorClass(ItemidGroupingComparator.class);
3.序列化
1.使用
1、一般将自定义的bean放在key中传输,实现自定义排序规则和序列化传输
2、实现WritableComparable接口,这个接口继承了序列化和排序接口
3、为什么不使用jdk的序列化???
因为附带很多额外的信息(各种校验信息,header,继承体系。。。。),不便于在网络中高效传输
4、注意map阶段的key的class别忘了修改
2.实例
class MyWord implements WritableComparable<MyWord> {private int count;private String word;public int getCount() {return count;}public void setCount(int count) {this.count = count;
}public String getWord() {return word;
}public void setWord(String word) {this.word = word;
}
//自定义排序规则
public int compareTo(MyWord o) {if (o.count<this.count){return -1;}else if (o.count>this.count)return 1;else {if (o.word.compareTo(this.word)<0)return -1;else if (o.word.compareTo(this.word)>0)return 1;elsereturn 0;}
}
//write和readfields的顺序必须一样,才保证序列化和反序列化的成功
public void write(DataOutput out) throws IOException {out.writeInt(count);out.writeChars(word);
}
public void readFields(DataInput in) throws IOException {this.count = in.readInt();this.word = in.readLine();
}@Override
public String toString() {return "MyWord{" +"count=" + count +", word='" + word + '\'' +'}';
}}
4.去重
将去重的类作为key
5.二次排序
1.题目
将wordcount的结果按照这样输出9 haha5 gege5 ahah3 keke推荐用两次job,第一次排序为ahah 5gege 5haha 9keke 3然后自定义类进行排序第二次
6.topn
1.思想
1、在reduce用treeset作为容器存放前n的值
2、用两个job,第一个排序,第二个取值
7.多job串联
1.案例
ControlledJob cJob1 = new ControlledJob(job1.getConfiguration());ControlledJob cJob2 = new ControlledJob(job2.getConfiguration());ControlledJob cJob3 = new ControlledJob(job3.getConfiguration());// 设置作业依赖关系cJob2.addDependingJob(cJob1);cJob3.addDependingJob(cJob2);JobControl jobControl = new JobControl("RecommendationJob");jobControl.addJob(cJob1); jobControl.addJob(cJob2); jobControl.addJob(cJob3); cJob1.setJob(job1); cJob2.setJob(job2); cJob3.setJob(job3); // 新建一个线程来运行已加入JobControl中的作业,开始进程并等待结束 Thread jobControlThread = new Thread(jobControl); jobControlThread.start(); while (!jobControl.allFinished()) { Thread.sleep(500); } jobControl.stop(); return 0;
8.map端join
1.使用
在map端加载小文件到内存,然后用流读取大文件对比。
2.案例
public class TestDistributedCache {static class TestDistributedCacheMapper extends Mapper<LongWritable, Text, Text, Text>{FileReader in = null;BufferedReader reader = null;HashMap<String,String> b_tab = new HashMap<String, String>();String localpath =null;String uirpath = null;//是在map任务初始化的时候调用一次@Overrideprotected void setup(Context context) throws IOException, InterruptedException { //通过这几句代码可以获取到cache file的本地绝对路径,测试验证用 Path[] files = context.getLocalCacheFiles(); localpath = files[0].toString(); URI[] cacheFiles = context.getCacheFiles(); //缓存文件的用法——直接用本地IO来读取 //这里读的数据是map task所在机器本地工作目录中的一个小文件 in = new FileReader("b.txt"); reader =new BufferedReader(in); String line =null; while(null!=(line=reader.readLine())){ String[] fields = line.split(","); b_tab.put(fields[0],fields[1]); } IOUtils.closeStream(reader); IOUtils.closeStream(in); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //这里读的是这个map task所负责的那一个切片数据(在hdfs上) String[] fields = value.toString().split("\t"); String a_itemid = fields[0]; String a_amount = fields[1]; String b_name = b_tab.get(a_itemid);
// 输出结果 1001 98.9 banan context.write(new Text(a_itemid), new Text(a_amount + "\t" + ":" + localpath + "\t" +b_name )); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(TestDistributedCache.class); job.setMapperClass(TestDistributedCacheMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //这里是我们正常的需要处理的数据所在路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //不需要reducer job.setNumReduceTasks(0); //分发一个文件到task进程的工作目录 job.addCacheFile(new URI("hdfs://hadoop-server01:9000/cachefile/b.txt")); //分发一个归档文件到task进程的工作目录// job.addArchiveToClassPath(archive); //分发jar包到task节点的classpath下// job.addFileToClassPath(jarfile); job.waitForCompletion(true); }}
9.reduce端join
1.概括
1、将输入的文件名(自定义字段)等作为参数和value拼接作为标记
2、reduce时创建一个集合存放相同标记的值
3、可以在cleanup中循环遍历两个集合输出
2.案例
public class MovieMR1 {```
public static void main(String[] args) throws Exception {Configuration conf1 = new Configuration();/*conf1.set("fs.defaultFS", "hdfs://hadoop1:9000/");System.setProperty("HADOOP_USER_NAME", "hadoop");*/FileSystem fs1 = FileSystem.get(conf1);Job job = Job.getInstance(conf1);job.setJarByClass(MovieMR1.class);job.setMapperClass(MoviesMapper.class);job.setReducerClass(MoviesReduceJoinReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);Path inputPath1 = new Path("D:\\MR\\hw\\movie\\input\\movies");Path inputPath2 = new Path("D:\\MR\\hw\\movie\\input\\ratings");Path outputPath1 = new Path("D:\\MR\\hw\\movie\\output");if(fs1.exists(outputPath1)) {fs1.delete(outputPath1, true);}FileInputFormat.addInputPath(job, inputPath1);FileInputFormat.addInputPath(job, inputPath2);FileOutputFormat.setOutputPath(job, outputPath1);boolean isDone = job.waitForCompletion(true);System.exit(isDone ? 0 : 1);
}
``` ```
public static class MoviesMapper extends Mapper<LongWritable, Text, Text, Text>{Text outKey = new Text();Text outValue = new Text();StringBuilder sb = new StringBuilder();protected void map(LongWritable key, Text value,Context context) throws java.io.IOException ,InterruptedException {FileSplit inputSplit = (FileSplit)context.getInputSplit();String name = inputSplit.getPath().getName();String[] split = value.toString().split("::");sb.setLength(0);if(name.equals("movies.dat")) {// 1 :: Toy Story (1995) :: Animation|Children's|Comedy//对应字段中文解释: 电影ID 电影名字 电影类型outKey.set(split[0]);StringBuilder append = sb.append(split[1]).append("\t").append(split[2]);String str = "movies#"+append.toString();outValue.set(str);//System.out.println(outKey+"---"+outValue);context.write(outKey, outValue);}else{// 1 :: 1193 :: 5 :: 978300760//对应字段中文解释: 用户ID 电影ID 评分 评分时间戳outKey.set(split[1]);StringBuilder append = sb.append(split[0]).append("\t").append(split[2]).append("\t").append(split[3]);String str = "ratings#" + append.toString();outValue.set(str);//System.out.println(outKey+"---"+outValue);context.write(outKey, outValue);}};}
``` ```
public static class MoviesReduceJoinReducer extends Reducer<Text, Text, Text, Text>{//用来存放 电影ID 电影名称 电影类型 List<String> moviesList = new ArrayList<>();//用来存放 电影ID 用户ID 用户评分 时间戳List<String> ratingsList = new ArrayList<>();Text outValue = new Text();@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {int count = 0;//迭代集合for(Text text : values) {//将集合中的元素添加到对应的list中if(text.toString().startsWith("movies#")) {String string = text.toString().split("#")[1];moviesList.add(string);}else if(text.toString().startsWith("ratings#")){String string = text.toString().split("#")[1];ratingsList.add(string);}}//获取2个集合的长度long moviesSize = moviesList.size();long ratingsSize = ratingsList.size();for(int i=0;i<moviesSize;i++) {for(int j=0;j<ratingsSize;j++) {outValue.set(moviesList.get(i)+"\t"+ratingsList.get(j));//最后的输出是 电影ID 电影名称 电影类型 用户ID 用户评分 时间戳context.write(key, outValue);}}moviesList.clear();ratingsList.clear();}}}
10.压缩文件读取
1.map
1.参数配置
配置文件中设置
mapreduce.output.fileoutputformat.compress=falsemapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DefaultCodecmapreduce.output.fileoutputformat.compress.type=RECORD
代码中配置
Job job = Job.getInstance(conf);
FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, (Class<? extends CompressionCodec>) Class.forName(""));
2.读取
Hadoop自带的InputFormat类内置支持压缩文件的读取,比如TextInputformat类,在其initialize方法中
public void initialize(InputSplit genericSplit,TaskAttemptContext context) throws IOException {FileSplit split = (FileSplit) genericSplit;Configuration job = context.getConfiguration();this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);start = split.getStart();end = start + split.getLength();final Path file = split.getPath();// open the file and seek to the start of the splitfinal FileSystem fs = file.getFileSystem(job);fileIn = fs.open(file);//根据文件后缀名创建相应压缩编码的codecCompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);if (null!=codec) {isCompressedInput = true; decompressor = CodecPool.getDecompressor(codec);//判断是否属于可切片压缩编码类型if (codec instanceof SplittableCompressionCodec) {final SplitCompressionInputStream cIn =((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, start, end,SplittableCompressionCodec.READ_MODE.BYBLOCK);//如果是可切片压缩编码,则创建一个CompressedSplitLineReader读取压缩数据in = new CompressedSplitLineReader(cIn, job,this.recordDelimiterBytes);start = cIn.getAdjustedStart();end = cIn.getAdjustedEnd();filePosition = cIn;} else {//如果是不可切片压缩编码,则创建一个SplitLineReader读取压缩数据,并将文件输入流转换成解压数据流传递给普通SplitLineReader读取in = new SplitLineReader(codec.createInputStream(fileIn,decompressor), job, this.recordDelimiterBytes);filePosition = fileIn;}} else {fileIn.seek(start);//如果不是压缩文件,则创建普通SplitLineReader读取数据in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes);filePosition = fileIn;}
3.输出
代码中设置
conf.setBoolean(Job.MAP_OUTPUT_COMPRESS, true);conf.setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, GzipCodec.class, CompressionCodec.class);
配置文件中设置
mapreduce.map.output.compress=falsemapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.DefaultCodec
2.reduce
1.输出
代码中配置
Job job = Job.getInstance(conf);FileOutputFormat.setCompressOutput(job, true);FileOutputFormat.setOutputCompressorClass(job, (Class<? extends CompressionCodec>) Class.forName(""));
配置文件中
mapreduce.output.fileoutputformat.compress=falsemapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DefaultCodecmapreduce.output.fileoutputformat.compress.type=RECORD
11.倒排索引
1.题目
假设有三个文件,分别为
spark:hive is is bset
hadoop:hadoop is best
hive:hive is is best
输出为:
best: spark 1 hadoop 1 hive 1
is :spark 2 hadoop 1 hive 2
hive:spark 1 hadoop 1 hive 1
hadoop:hadoop 1
可以在map中以内容为key,文件名为v切割。reduce中设置list集合存储v,然后在cleanup中输出
12.自定义inputformat
1.案例1
读取全部小文件
自定义InputFromat
public class WholeFileInputFormat extends FileInputFormat<NullWritable,BytesWritable> {//设置每个小文件不可分片,保证一个小文件生成一个key-value键值对@Overrideprotected
boolean isSplitable(JobContext context, Path file) {return false;}@Overridepublic RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException,InterruptedException
{WholeFileRecordReader reader = new WholeFileRecordReader();reader.initialize(split, context);return reader;}
}
自定义RecordReader
class WholeFileRecordReader extends
RecordReader<NullWritable, BytesWritable> {private FileSplit fileSplit;private Configuration conf;private BytesWritable value = new BytesWritable();private boolean processed = false;@Overridepublic void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {this.fileSplit = (FileSplit) split;this.conf = context.getConfiguration();}@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {if(!processed) {byte[] contents = new byte[(int) fileSplit.getLength()];Path file = fileSplit.getPath();FileSystem fs = file.getFileSystem(conf);FSDataInputStream in = null;try{ in = fs.open(file);IOUtils.readFully(in,contents, 0, contents.length);value.set(contents,0, contents.length);}finally {IOUtils.closeStream(in);}processed= true;returntrue;}return false;}@Overridepublic NullWritable getCurrentKey() throws IOException, InterruptedException{return NullWritable.get();}@Overridepublic BytesWritable getCurrentValue() throws IOException,InterruptedException{return value;}@Overridepublic float getProgress() throws IOException {return processed ? 1.0f : 0.0f;}```@Overridepublic void close() throws IOException {//do nothing}
}
定义mapreduce处理流程
public class SmallFilesToSequenceFileConverter extends Configured implementsTool {static class SequenceFileMapper extendsMapper<NullWritable, BytesWritable, Text, BytesWritable> {private Text filenameKey;@Overrideprotected void setup(Context context) throws IOException,InterruptedException {InputSplit split = context.getInputSplit();Path path = ((FileSplit) split).getPath();filenameKey = new Text(path.toString());}@Overrideprotected void map(NullWritable key, BytesWritable value,Context context) throws IOException, InterruptedException {context.write(filenameKey, value);}}@Overridepublic int run(String[] args) throws Exception {Configuration conf = new Configuration();System.setProperty("HADOOP_USER_NAME", "hdfs");String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: combinefiles <in> <out>");System.exit(2);}Job job = Job.getInstance(conf,"combine small files to sequencefile");
// job.setInputFormatClass(WholeFileInputFormat.class);job.setOutputFormatClass(SequenceFileOutputFormat.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);job.setMapperClass(SequenceFileMapper.class);return job.waitForCompletion(true) ? 0 : 1;}public static void main(String[] args) throws Exception {int exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverter(),args);System.exit(exitCode);}
}
2.案例2
设置一次读取5行
自定义输入
public class MyInputFormat extends InputFormat <LongWritable, Text>{public MyInputFormat(){}public static class MyInputSplit extends InputSplit implements Writable{private long start;private long end;public MyInputSplit(){}public MyInputSplit(long start, long end) {this.start = start;this.end = end;}public long getStart() {return start;}public void setStart(int start) {this.start = start;}public long getEnd() {return end;}public void setEnd(int end) {this.end = end;}public long getLength() throws IOException, InterruptedException {return this.end-this.start;}public String[] getLocations() throws IOException, InterruptedException {return new String[0];}public void write(DataOutput dataOutput) throws IOException {dataOutput.writeLong(start);dataOutput.writeLong(end);}public void readFields(DataInput dataInput) throws IOException {this.start=dataInput.readLong();this.end=dataInput.readLong();}}public List<InputSplit> getSplits(JobContext context) throws FileNotFoundException {List<InputSplit> list = new ArrayList<InputSplit>();long chunk = 5;long chunksize = 2;//判断是否数据足够。//将数据进行切片,也就是一个map里面有一个切片,一个切片有上面定义的chunk = 2 条数据。for(int i = 0;i<chunksize;i++){MyInputSplit mi = null;if(i+1==chunksize){mi = new MyInputSplit(i*chunk,10);list.add(mi);}else{mi = new MyInputSplit(i*chunk,i*chunk+chunk);list.add(mi);}}//切片集合。return list;}public static class MyRecordReader extends RecordReader<LongWritable, Text>{private MyInputSplit split;//从My中查出来的结果集private Iterator<String> iter;//定义索引,每次都会被初始化成0,也就是只能读取自己切片中的 k,vprivate int index;private LongWritable k; //偏移量,再下面会自动封装成切片数据的开始,就会知道读多少行 ,对应map泛型的第一个值。private Text v; //每次读到的结果,会通过返回出去,对应 map泛型的第二个。private String lines = "";public MyRecordReader(InputSplit split,TaskAttemptContext context) throws IOException, InterruptedException{super();initialize(split,context);;}public MyRecordReader(){}//初始化,将一些对象new出来,并把得到的切片(1个)强转。public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {this.split = (MyInputSplit)split;this.k = new LongWritable();v = new Text();}//读取数据,并把数据封装到当前MyRecordReader的k v中。public boolean nextKeyValue() throws IOException, InterruptedException {if(this.iter == null) {List<String> list = new ArrayList<String>();BufferedReader reader = new BufferedReader(new FileReader("d://logs/demo1.txt"));int num = 0;String line = null;while((line = reader.readLine())!=null){if(this.split.start<=num&&num<this.split.end){list.add(line);}num++;}iter = list.iterator();}boolean hasNext = this.iter.hasNext();while(iter.hasNext()){//获取游标的下一个值String line = iter.next();index++;lines+=line;}this.k.set(this.split.start+index);this.v = new Text(lines);return hasNext;}public LongWritable getCurrentKey() throws IOException, InterruptedException {return this.k;}public Text getCurrentValue() throws IOException, InterruptedException {return this.v;}public float getProgress() throws IOException, InterruptedException {return 0;}public void close() throws IOException {}}public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {return new MyRecordReader(split,context);}
}
自定义mr流程
public class Text3 {public static class MyMapper extends Mapper<LongWritable, Text,Text, NullWritable> {IntWritable iw = new IntWritable(1);Text text = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();text.set(line);context.write(text,NullWritable.get());//text.set("{name:"+name+",age:"+age+",sex:"+sex+"}");// context.write(text,NullWritable.get());}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(Text3.class);//指定本业务job要使用的mapper业务类job.setMapperClass(MyMapper.class);//指定本业务job要使用的reducer业务类//指定map输出的类型是什么job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);//指定最终输出数据的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);job.setInputFormatClass(MyInputFormat.class);FileOutputFormat.setOutputPath(job,new Path("D:\\logs\\out17"));boolean res = job.waitForCompletion(true);System.exit(res?0:1);}
}
13.自定义outputformat
1.案例1
问题:输出到mysql数据库
获取数据库的工具类
public class DBLoader {public static void dbLoader(HashMap<String, String> ruleMap) {Connection conn = null;Statement st = null;ResultSet res = null;try {Class.forName("com.mysql.jdbc.Driver");conn = DriverManager.getConnection("jdbc:mysql://hdp-node01:3306/urlknowledge", "root", "root");st = conn.createStatement();res = st.executeQuery("select url,content from urlcontent");while (res.next()) {ruleMap.put(res.getString(1), res.getString(2));}} catch (Exception e) {e.printStackTrace();} finally {try{if(res!=null){res.close();}if(st!=null){st.close();}if(conn!=null){conn.close();}}catch(Exception e){e.printStackTrace();}}}public static void main(String[] args) {DBLoader db = new DBLoader();HashMap<String, String> map = new HashMap<String,String>();db.dbLoader(map);System.out.println(map.size());}
}
自定义output format
public class LogEnhancerOutputFormat extends FileOutputFormat<Text, NullWritable>{@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {FileSystem fs = FileSystem.get(context.getConfiguration());Path enhancePath = new Path("hdfs://hdp-node01:9000/flow/enhancelog/enhanced.log");Path toCrawlPath = new Path("hdfs://hdp-node01:9000/flow/tocrawl/tocrawl.log");FSDataOutputStream enhanceOut = fs.create(enhancePath);FSDataOutputStream toCrawlOut = fs.create(toCrawlPath);return new MyRecordWriter(enhanceOut,toCrawlOut);}static class MyRecordWriter extends RecordWriter<Text, NullWritable>{FSDataOutputStream enhanceOut = null;FSDataOutputStream toCrawlOut = null;public MyRecordWriter(FSDataOutputStream enhanceOut, FSDataOutputStream toCrawlOut) {this.enhanceOut = enhanceOut;this.toCrawlOut = toCrawlOut;}@Overridepublic void write(Text key, NullWritable value) throws IOException, InterruptedException {//有了数据,你来负责写到目的地 —— hdfs//判断,进来内容如果是带tocrawl的,就往待爬清单输出流中写 toCrawlOutif(key.toString().contains("tocrawl")){toCrawlOut.write(key.toString().getBytes());}else{enhanceOut.write(key.toString().getBytes());}}@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {if(toCrawlOut!=null){toCrawlOut.close();}if(enhanceOut!=null){enhanceOut.close();}}}
}
自定义mr处理流程
/*** 这个程序是对每个小时不断产生的用户上网记录日志进行增强(将日志中的url所指向的网页内容分析结果信息追加到每一行原始日志后面)* * @author* */
public class LogEnhancer {static class LogEnhancerMapper extends Mapper<LongWritable, Text, Text, NullWritable> {HashMap<String, String> knowledgeMap = new HashMap<String, String>();/*** maptask在初始化时会先调用setup方法一次 利用这个机制,将外部的知识库加载到maptask执行的机器内存中*/@Overrideprotected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException {DBLoader.dbLoader(knowledgeMap);}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] fields = StringUtils.split(line, "\t");try {String url = fields[26];// 对这一行日志中的url去知识库中查找内容分析信息String content = knowledgeMap.get(url);// 根据内容信息匹配的结果,来构造两种输出结果String result = "";if (null == content) {// 输往待爬清单的内容result = url + "\t" + "tocrawl\n";} else {// 输往增强日志的内容result = line + "\t" + content + "\n";}context.write(new Text(result), NullWritable.get());} catch (Exception e) {}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(LogEnhancer.class);job.setMapperClass(LogEnhancerMapper.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);// 要将自定义的输出格式组件设置到job中job.setOutputFormatClass(LogEnhancerOutputFormat.class);FileInputFormat.setInputPaths(job, new Path(args[0]));// 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat// 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录FileOutputFormat.setOutputPath(job, new Path(args[1]));job.waitForCompletion(true);System.exit(0);}}