放一些MR案例

news/2024/11/27 8:31:10/

1.partitioner分区

1.概念

1、将map输出的kv对,按照相同key分组,然后分发给不同的reducetask

默认的分发规则为:根据keyhashcode%reducetask

2、一般返回值从0开始,并且定义多少的reduce数,就有多少分区

2.实例

class MyPartition extends Partitioner<Text, IntWritable> {/*** 自定义分区将数字放在0号分区,其余放在1号分区*/@Overridepublic int getPartition(Text key, IntWritable value, int numPartitions) {try {Integer.parseInt(key.toString());return 0;} catch (Exception e) {return 1;}}
}main函数中
//设置分区类
job.setPartitionerClass(MyPartition.class);
//设置分区数量
job.setNumReduceTasks(2);

2.combiner聚合

1.概念

1、Combiner是在每一个maptask所在的节点运行,进行预聚合,减少网络传输量。

2、combiner能够应用的前提是不能影响最终的业务逻辑。而且,combiner的输出kv应该跟reducer的输入kv类型要对应,combiner的输入kv和map的输出kv相同

3、继承reduce类

2.案例

class MyCombiner extends Reducer<Text,Text,Text,Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {int num = 0;Text text = new Text();while (values.iterator().hasNext()){num++;text = values.iterator().next();}context.write(key, new Text(text.toString()+":"+num+""));
}
}main方法中
job.setCombinerClass(MyCombiner.class);

3.分组比较器

1.使用

1、自定义分组规则,将相同规则的放在同一个组中。

例如将student的name属性作为分组的标准。

2、继承writablecomparator

2.案例

public class ItemidGroupingComparator extends WritableComparator {protected ItemidGroupingComparator() {//必须用父类的构造方法super(OrderBean.class, true);}@Override//自定义分组规则public int compare(WritableComparable a, WritableComparable b) {OrderBean abean = (OrderBean) a;OrderBean bbean = (OrderBean) b;//将item_id相同的bean都视为相同,从而聚合为一组return abean.getItemid().compareTo(bbean.getItemid());}}//注意orderbean类的书写
public class OrderBean implements WritableComparable<OrderBean>{private Text itemid;private DoubleWritable amount;public OrderBean() {}public OrderBean(Text itemid, DoubleWritable amount) {set(itemid, amount);}public void set(Text itemid, DoubleWritable amount) {this.itemid = itemid;this.amount = amount;}public Text getItemid() {return itemid;}public DoubleWritable getAmount() {return amount;}@Overridepublic int compareTo(OrderBean o) {int cmp = this.itemid.compareTo(o.getItemid());if (cmp == 0) {cmp = -this.amount.compareTo(o.getAmount());}return cmp;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(itemid.toString());out.writeDouble(amount.get());}@Overridepublic void readFields(DataInput in) throws IOException {String readUTF = in.readUTF();double readDouble = in.readDouble();this.itemid = new Text(readUTF);this.amount= new DoubleWritable(readDouble);}@Overridepublic String toString() {return itemid.toString() + "\t" + amount.get();}
}main方法中
job.setGroupingComparatorClass(ItemidGroupingComparator.class);

3.序列化

1.使用

1、一般将自定义的bean放在key中传输,实现自定义排序规则和序列化传输

2、实现WritableComparable接口,这个接口继承了序列化和排序接口

3、为什么不使用jdk的序列化???

​ 因为附带很多额外的信息(各种校验信息,header,继承体系。。。。),不便于在网络中高效传输

4、注意map阶段的key的class别忘了修改

2.实例

class MyWord implements WritableComparable<MyWord> {private int count;private String word;public int getCount() {return count;}public void setCount(int count) {this.count = count;
}public String getWord() {return word;
}public void setWord(String word) {this.word = word;
}
//自定义排序规则
public int compareTo(MyWord o) {if (o.count<this.count){return -1;}else if (o.count>this.count)return 1;else {if (o.word.compareTo(this.word)<0)return -1;else if (o.word.compareTo(this.word)>0)return 1;elsereturn 0;}
}
//write和readfields的顺序必须一样,才保证序列化和反序列化的成功
public void write(DataOutput out) throws IOException {out.writeInt(count);out.writeChars(word); 
}
public void readFields(DataInput in) throws IOException {this.count = in.readInt();this.word = in.readLine();
}@Override
public String toString() {return "MyWord{" +"count=" + count +", word='" + word + '\'' +'}';
}}

4.去重

将去重的类作为key

5.二次排序

1.题目

将wordcount的结果按照这样输出9  haha5  gege5  ahah3  keke推荐用两次job,第一次排序为ahah  5gege  5haha  9keke   3然后自定义类进行排序第二次

6.topn

1.思想

1、在reduce用treeset作为容器存放前n的值

2、用两个job,第一个排序,第二个取值

7.多job串联

1.案例

  ControlledJob cJob1 = new ControlledJob(job1.getConfiguration());ControlledJob cJob2 = new ControlledJob(job2.getConfiguration());ControlledJob cJob3 = new ControlledJob(job3.getConfiguration());// 设置作业依赖关系cJob2.addDependingJob(cJob1);cJob3.addDependingJob(cJob2);JobControl jobControl = new JobControl("RecommendationJob");jobControl.addJob(cJob1);​        jobControl.addJob(cJob2);​        jobControl.addJob(cJob3);​        cJob1.setJob(job1);​        cJob2.setJob(job2);​        cJob3.setJob(job3);​        // 新建一个线程来运行已加入JobControl中的作业,开始进程并等待结束​        Thread jobControlThread = new Thread(jobControl);​        jobControlThread.start();​        while (!jobControl.allFinished()) {​            Thread.sleep(500);​        }​        jobControl.stop();​        return 0;

8.map端join

1.使用

在map端加载小文件到内存,然后用流读取大文件对比。

2.案例

public class TestDistributedCache {static class TestDistributedCacheMapper extends Mapper<LongWritable, Text, Text, Text>{FileReader in = null;BufferedReader reader = null;HashMap<String,String> b_tab = new HashMap<String, String>();String localpath =null;String uirpath = null;//是在map任务初始化的时候调用一次@Overrideprotected void setup(Context context) throws IOException, InterruptedException {​                          //通过这几句代码可以获取到cache file的本地绝对路径,测试验证用​                          Path[] files = context.getLocalCacheFiles();​                          localpath = files[0].toString();​                          URI[] cacheFiles = context.getCacheFiles();​                          //缓存文件的用法——直接用本地IO来读取​                          //这里读的数据是map task所在机器本地工作目录中的一个小文件​                          in = new FileReader("b.txt");​                          reader =new BufferedReader(in);​                          String line =null;​                          while(null!=(line=reader.readLine())){​                                   ​                                   String[] fields = line.split(",");​                                   b_tab.put(fields[0],fields[1]);​                          }​                          IOUtils.closeStream(reader);​                          IOUtils.closeStream(in);​                  }​                  @Override​                  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {​                          //这里读的是这个map task所负责的那一个切片数据(在hdfs上)​                           String[] fields = value.toString().split("\t");​                           String a_itemid = fields[0];​                           String a_amount = fields[1];                        ​                           String b_name = b_tab.get(a_itemid);                         
​                           // 输出结果  1001     98.9 banan​                           context.write(new Text(a_itemid), new Text(a_amount + "\t" + ":" + localpath + "\t" +b_name ));​                           ​                  }​         }​         public static void main(String[] args) throws Exception {​                  ​                  Configuration conf = new Configuration();​                  Job job = Job.getInstance(conf);​                  ​                  job.setJarByClass(TestDistributedCache.class);​                  ​                  job.setMapperClass(TestDistributedCacheMapper.class);​                  ​                  job.setOutputKeyClass(Text.class);​                  job.setOutputValueClass(LongWritable.class);​                  ​                  //这里是我们正常的需要处理的数据所在路径​                  FileInputFormat.setInputPaths(job, new Path(args[0]));​                  FileOutputFormat.setOutputPath(job, new Path(args[1]));​                  ​                  //不需要reducer​                  job.setNumReduceTasks(0);​                  //分发一个文件到task进程的工作目录​                  job.addCacheFile(new URI("hdfs://hadoop-server01:9000/cachefile/b.txt"));​                  ​                  //分发一个归档文件到task进程的工作目录//              job.addArchiveToClassPath(archive);​                  //分发jar包到task节点的classpath下//              job.addFileToClassPath(jarfile);​                  ​                  job.waitForCompletion(true);​         }}

9.reduce端join

1.概括

1、将输入的文件名(自定义字段)等作为参数和value拼接作为标记

2、reduce时创建一个集合存放相同标记的值

3、可以在cleanup中循环遍历两个集合输出

2.案例

public class MovieMR1 {​```
public static void main(String[] args) throws Exception {Configuration conf1 = new Configuration();/*conf1.set("fs.defaultFS", "hdfs://hadoop1:9000/");System.setProperty("HADOOP_USER_NAME", "hadoop");*/FileSystem fs1 = FileSystem.get(conf1);Job job = Job.getInstance(conf1);job.setJarByClass(MovieMR1.class);job.setMapperClass(MoviesMapper.class);job.setReducerClass(MoviesReduceJoinReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);Path inputPath1 = new Path("D:\\MR\\hw\\movie\\input\\movies");Path inputPath2 = new Path("D:\\MR\\hw\\movie\\input\\ratings");Path outputPath1 = new Path("D:\\MR\\hw\\movie\\output");if(fs1.exists(outputPath1)) {fs1.delete(outputPath1, true);}FileInputFormat.addInputPath(job, inputPath1);FileInputFormat.addInputPath(job, inputPath2);FileOutputFormat.setOutputPath(job, outputPath1);boolean isDone = job.waitForCompletion(true);System.exit(isDone ? 0 : 1);
}
​```​    ​```
public static class MoviesMapper extends Mapper<LongWritable, Text, Text, Text>{Text outKey = new Text();Text outValue = new Text();StringBuilder sb = new StringBuilder();protected void map(LongWritable key, Text value,Context context) throws java.io.IOException ,InterruptedException {FileSplit inputSplit = (FileSplit)context.getInputSplit();String name = inputSplit.getPath().getName();String[] split = value.toString().split("::");sb.setLength(0);if(name.equals("movies.dat")) {//                    1  ::  Toy Story (1995)  ::  Animation|Children's|Comedy//对应字段中文解释:  电影ID      电影名字                         电影类型outKey.set(split[0]);StringBuilder append = sb.append(split[1]).append("\t").append(split[2]);String str = "movies#"+append.toString();outValue.set(str);//System.out.println(outKey+"---"+outValue);context.write(outKey, outValue);}else{//                    1  ::  1193  ::  5  ::  978300760//对应字段中文解释:  用户ID             电影ID         评分       评分时间戳outKey.set(split[1]);StringBuilder append = sb.append(split[0]).append("\t").append(split[2]).append("\t").append(split[3]);String str = "ratings#" + append.toString();outValue.set(str);//System.out.println(outKey+"---"+outValue);context.write(outKey, outValue);}};}
​```​    ​```
public static class MoviesReduceJoinReducer extends Reducer<Text, Text, Text, Text>{//用来存放    电影ID    电影名称    电影类型    List<String> moviesList = new ArrayList<>();//用来存放    电影ID    用户ID 用户评分    时间戳List<String> ratingsList = new ArrayList<>();Text outValue = new Text();@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {int count = 0;//迭代集合for(Text text : values) {//将集合中的元素添加到对应的list中if(text.toString().startsWith("movies#")) {String string = text.toString().split("#")[1];moviesList.add(string);}else if(text.toString().startsWith("ratings#")){String string = text.toString().split("#")[1];ratingsList.add(string);}}//获取2个集合的长度long moviesSize = moviesList.size();long ratingsSize = ratingsList.size();for(int i=0;i<moviesSize;i++) {for(int j=0;j<ratingsSize;j++) {outValue.set(moviesList.get(i)+"\t"+ratingsList.get(j));//最后的输出是    电影ID    电影名称    电影类型    用户ID 用户评分    时间戳context.write(key, outValue);}}moviesList.clear();ratingsList.clear();}}}

10.压缩文件读取

1.map

1.参数配置

配置文件中设置

mapreduce.output.fileoutputformat.compress=falsemapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DefaultCodecmapreduce.output.fileoutputformat.compress.type=RECORD

代码中配置

Job job = Job.getInstance(conf);
FileOutputFormat.setCompressOutput(job, true);                FileOutputFormat.setOutputCompressorClass(job, (Class<? extends CompressionCodec>) Class.forName(""));
2.读取

Hadoop自带的InputFormat类内置支持压缩文件的读取,比如TextInputformat类,在其initialize方法中

public void initialize(InputSplit genericSplit,TaskAttemptContext context) throws IOException {FileSplit split = (FileSplit) genericSplit;Configuration job = context.getConfiguration();this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);start = split.getStart();end = start + split.getLength();final Path file = split.getPath();// open the file and seek to the start of the splitfinal FileSystem fs = file.getFileSystem(job);fileIn = fs.open(file);//根据文件后缀名创建相应压缩编码的codecCompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);if (null!=codec) {isCompressedInput = true; decompressor = CodecPool.getDecompressor(codec);//判断是否属于可切片压缩编码类型if (codec instanceof SplittableCompressionCodec) {final SplitCompressionInputStream cIn =((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, start, end,SplittableCompressionCodec.READ_MODE.BYBLOCK);//如果是可切片压缩编码,则创建一个CompressedSplitLineReader读取压缩数据in = new CompressedSplitLineReader(cIn, job,this.recordDelimiterBytes);start = cIn.getAdjustedStart();end = cIn.getAdjustedEnd();filePosition = cIn;} else {//如果是不可切片压缩编码,则创建一个SplitLineReader读取压缩数据,并将文件输入流转换成解压数据流传递给普通SplitLineReader读取in = new SplitLineReader(codec.createInputStream(fileIn,decompressor), job, this.recordDelimiterBytes);filePosition = fileIn;}} else {fileIn.seek(start);//如果不是压缩文件,则创建普通SplitLineReader读取数据in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes);filePosition = fileIn;}
3.输出

代码中设置

conf.setBoolean(Job.MAP_OUTPUT_COMPRESS, true);conf.setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, GzipCodec.class, CompressionCodec.class);

配置文件中设置

mapreduce.map.output.compress=falsemapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.DefaultCodec

2.reduce

1.输出

代码中配置

Job job = Job.getInstance(conf);FileOutputFormat.setCompressOutput(job, true);FileOutputFormat.setOutputCompressorClass(job, (Class<? extends CompressionCodec>) Class.forName(""));

配置文件中

mapreduce.output.fileoutputformat.compress=falsemapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DefaultCodecmapreduce.output.fileoutputformat.compress.type=RECORD

11.倒排索引

1.题目

假设有三个文件,分别为

spark:hive is is bset

hadoop:hadoop is best

hive:hive is is best

输出为:

best: spark 1 hadoop 1 hive 1

is :spark 2 hadoop 1 hive 2

hive:spark 1 hadoop 1 hive 1

hadoop:hadoop 1

可以在map中以内容为key,文件名为v切割。reduce中设置list集合存储v,然后在cleanup中输出

12.自定义inputformat

1.案例1

读取全部小文件

自定义InputFromat

public class WholeFileInputFormat extends FileInputFormat<NullWritable,BytesWritable> {//设置每个小文件不可分片,保证一个小文件生成一个key-value键值对@Overrideprotected
boolean isSplitable(JobContext context, Path file) {return false;}@Overridepublic RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException,InterruptedException
{WholeFileRecordReader  reader = new WholeFileRecordReader();reader.initialize(split, context);return reader;}
}

自定义RecordReader

class WholeFileRecordReader extends
RecordReader<NullWritable, BytesWritable> {private FileSplit fileSplit;private Configuration conf;private BytesWritable value = new BytesWritable();private boolean processed = false;@Overridepublic void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {this.fileSplit = (FileSplit) split;this.conf = context.getConfiguration();}@Overridepublic  boolean nextKeyValue() throws IOException, InterruptedException {if(!processed) {byte[] contents = new byte[(int) fileSplit.getLength()];Path file = fileSplit.getPath();FileSystem fs = file.getFileSystem(conf);FSDataInputStream in = null;try{ in = fs.open(file);IOUtils.readFully(in,contents, 0, contents.length);value.set(contents,0, contents.length);}finally {IOUtils.closeStream(in);}processed= true;returntrue;}return false;}@Overridepublic NullWritable getCurrentKey() throws IOException, InterruptedException{return NullWritable.get();}@Overridepublic BytesWritable getCurrentValue() throws IOException,InterruptedException{return value;}@Overridepublic float getProgress() throws IOException {return processed ? 1.0f : 0.0f;}​```@Overridepublic void close() throws IOException {//do nothing}
}

定义mapreduce处理流程

public class SmallFilesToSequenceFileConverter extends Configured implementsTool {static class SequenceFileMapper extendsMapper<NullWritable, BytesWritable, Text, BytesWritable> {private Text filenameKey;@Overrideprotected void setup(Context context) throws IOException,InterruptedException {InputSplit split = context.getInputSplit();Path path = ((FileSplit) split).getPath();filenameKey = new Text(path.toString());}@Overrideprotected void map(NullWritable key, BytesWritable value,Context context) throws IOException, InterruptedException {context.write(filenameKey, value);}}@Overridepublic int run(String[] args) throws Exception {Configuration conf = new Configuration();System.setProperty("HADOOP_USER_NAME", "hdfs");String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: combinefiles <in> <out>");System.exit(2);}Job job = Job.getInstance(conf,"combine small files to sequencefile");
//		job.setInputFormatClass(WholeFileInputFormat.class);job.setOutputFormatClass(SequenceFileOutputFormat.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);job.setMapperClass(SequenceFileMapper.class);return job.waitForCompletion(true) ? 0 : 1;}public static void main(String[] args) throws Exception {int exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverter(),args);System.exit(exitCode);}
}

2.案例2

设置一次读取5行

自定义输入

public class MyInputFormat  extends InputFormat <LongWritable, Text>{public MyInputFormat(){}public static class MyInputSplit extends InputSplit implements Writable{private long start;private long end;public MyInputSplit(){}public MyInputSplit(long start, long end) {this.start = start;this.end = end;}public long getStart() {return start;}public void setStart(int start) {this.start = start;}public long getEnd() {return end;}public void setEnd(int end) {this.end = end;}public long getLength() throws IOException, InterruptedException {return this.end-this.start;}public String[] getLocations() throws IOException, InterruptedException {return new String[0];}public void write(DataOutput dataOutput) throws IOException {dataOutput.writeLong(start);dataOutput.writeLong(end);}public void readFields(DataInput dataInput) throws IOException {this.start=dataInput.readLong();this.end=dataInput.readLong();}}public List<InputSplit> getSplits(JobContext context) throws FileNotFoundException {List<InputSplit> list = new ArrayList<InputSplit>();long chunk = 5;long chunksize = 2;//判断是否数据足够。//将数据进行切片,也就是一个map里面有一个切片,一个切片有上面定义的chunk = 2 条数据。for(int i = 0;i<chunksize;i++){MyInputSplit mi = null;if(i+1==chunksize){mi = new MyInputSplit(i*chunk,10);list.add(mi);}else{mi = new MyInputSplit(i*chunk,i*chunk+chunk);list.add(mi);}}//切片集合。return list;}public  static class MyRecordReader extends RecordReader<LongWritable, Text>{private MyInputSplit split;//从My中查出来的结果集private Iterator<String> iter;//定义索引,每次都会被初始化成0,也就是只能读取自己切片中的 k,vprivate int index;private LongWritable k; //偏移量,再下面会自动封装成切片数据的开始,就会知道读多少行 ,对应map泛型的第一个值。private Text v;     //每次读到的结果,会通过返回出去,对应  map泛型的第二个。private String lines = "";public MyRecordReader(InputSplit split,TaskAttemptContext context) throws IOException, InterruptedException{super();initialize(split,context);;}public MyRecordReader(){}//初始化,将一些对象new出来,并把得到的切片(1个)强转。public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {this.split = (MyInputSplit)split;this.k =  new LongWritable();v = new Text();}//读取数据,并把数据封装到当前MyRecordReader的k v中。public boolean nextKeyValue() throws IOException, InterruptedException {if(this.iter == null) {List<String> list = new ArrayList<String>();BufferedReader reader = new BufferedReader(new FileReader("d://logs/demo1.txt"));int num = 0;String line = null;while((line = reader.readLine())!=null){if(this.split.start<=num&&num<this.split.end){list.add(line);}num++;}iter = list.iterator();}boolean hasNext = this.iter.hasNext();while(iter.hasNext()){//获取游标的下一个值String line = iter.next();index++;lines+=line;}this.k.set(this.split.start+index);this.v = new Text(lines);return hasNext;}public LongWritable getCurrentKey() throws IOException, InterruptedException {return this.k;}public Text getCurrentValue() throws IOException, InterruptedException {return this.v;}public float getProgress() throws IOException, InterruptedException {return 0;}public void close() throws IOException {}}public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {return new MyRecordReader(split,context);}
}

自定义mr流程


public class Text3 {public static class MyMapper extends Mapper<LongWritable, Text,Text, NullWritable> {IntWritable iw = new IntWritable(1);Text text = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();text.set(line);context.write(text,NullWritable.get());//text.set("{name:"+name+",age:"+age+",sex:"+sex+"}");// context.write(text,NullWritable.get());}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(Text3.class);//指定本业务job要使用的mapper业务类job.setMapperClass(MyMapper.class);//指定本业务job要使用的reducer业务类//指定map输出的类型是什么job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);//指定最终输出数据的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);job.setInputFormatClass(MyInputFormat.class);FileOutputFormat.setOutputPath(job,new Path("D:\\logs\\out17"));boolean res = job.waitForCompletion(true);System.exit(res?0:1);}
}

13.自定义outputformat

1.案例1

问题:输出到mysql数据库

获取数据库的工具类

public class DBLoader {public static void dbLoader(HashMap<String, String> ruleMap) {Connection conn = null;Statement st = null;ResultSet res = null;try {Class.forName("com.mysql.jdbc.Driver");conn = DriverManager.getConnection("jdbc:mysql://hdp-node01:3306/urlknowledge", "root", "root");st = conn.createStatement();res = st.executeQuery("select url,content from urlcontent");while (res.next()) {ruleMap.put(res.getString(1), res.getString(2));}} catch (Exception e) {e.printStackTrace();} finally {try{if(res!=null){res.close();}if(st!=null){st.close();}if(conn!=null){conn.close();}}catch(Exception e){e.printStackTrace();}}}public static void main(String[] args) {DBLoader db = new DBLoader();HashMap<String, String> map = new HashMap<String,String>();db.dbLoader(map);System.out.println(map.size());}
}

自定义output format

public class LogEnhancerOutputFormat extends FileOutputFormat<Text, NullWritable>{@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {FileSystem fs = FileSystem.get(context.getConfiguration());Path enhancePath = new Path("hdfs://hdp-node01:9000/flow/enhancelog/enhanced.log");Path toCrawlPath = new Path("hdfs://hdp-node01:9000/flow/tocrawl/tocrawl.log");FSDataOutputStream enhanceOut = fs.create(enhancePath);FSDataOutputStream toCrawlOut = fs.create(toCrawlPath);return new MyRecordWriter(enhanceOut,toCrawlOut);}static class MyRecordWriter extends RecordWriter<Text, NullWritable>{FSDataOutputStream enhanceOut = null;FSDataOutputStream toCrawlOut = null;public MyRecordWriter(FSDataOutputStream enhanceOut, FSDataOutputStream toCrawlOut) {this.enhanceOut = enhanceOut;this.toCrawlOut = toCrawlOut;}@Overridepublic void write(Text key, NullWritable value) throws IOException, InterruptedException {//有了数据,你来负责写到目的地  —— hdfs//判断,进来内容如果是带tocrawl的,就往待爬清单输出流中写 toCrawlOutif(key.toString().contains("tocrawl")){toCrawlOut.write(key.toString().getBytes());}else{enhanceOut.write(key.toString().getBytes());}}@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {if(toCrawlOut!=null){toCrawlOut.close();}if(enhanceOut!=null){enhanceOut.close();}}}
}

自定义mr处理流程

/*** 这个程序是对每个小时不断产生的用户上网记录日志进行增强(将日志中的url所指向的网页内容分析结果信息追加到每一行原始日志后面)* * @author* */
public class LogEnhancer {static class LogEnhancerMapper extends Mapper<LongWritable, Text, Text, NullWritable> {HashMap<String, String> knowledgeMap = new HashMap<String, String>();/*** maptask在初始化时会先调用setup方法一次 利用这个机制,将外部的知识库加载到maptask执行的机器内存中*/@Overrideprotected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException {DBLoader.dbLoader(knowledgeMap);}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] fields = StringUtils.split(line, "\t");try {String url = fields[26];// 对这一行日志中的url去知识库中查找内容分析信息String content = knowledgeMap.get(url);// 根据内容信息匹配的结果,来构造两种输出结果String result = "";if (null == content) {// 输往待爬清单的内容result = url + "\t" + "tocrawl\n";} else {// 输往增强日志的内容result = line + "\t" + content + "\n";}context.write(new Text(result), NullWritable.get());} catch (Exception e) {}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(LogEnhancer.class);job.setMapperClass(LogEnhancerMapper.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);// 要将自定义的输出格式组件设置到job中job.setOutputFormatClass(LogEnhancerOutputFormat.class);FileInputFormat.setInputPaths(job, new Path(args[0]));// 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat// 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录FileOutputFormat.setOutputPath(job, new Path(args[1]));job.waitForCompletion(true);System.exit(0);}}

http://www.ppmy.cn/news/694370.html

相关文章

MR开发高级教程

作为MR开发的老司机&#xff0c;准备分享MR开发高级教程&#xff0c;干货满满&#xff0c;功能覆盖 HoloLens多人交互共享、ARFoundation第三视角、动态更新资源、HoloLens第一视角、MRTK等~ 课程介绍&#xff1a; MR开发高级教程_哔哩哔哩_bilibili 课程大纲&#xff1a; C…

易基因:精原干细胞移植后出生小鼠子代中的精子DNA甲基化变化机制|新研究

大家好&#xff0c;这里是专注表观组学十余年&#xff0c;领跑多组学科研服务的易基因。 精原干细胞移植&#xff08;Spermatogonial stem cell transplantation&#xff0c;SSCT&#xff09;被提议作为儿童癌症幸存者的生育疗法。SSCT首先冷冻保存睾丸活检&#xff0c;然后再…

mr源码解析

目录 mr流程源码解析MapReduce中如何处理跨行的Block和InputSplit mr流程 在我们提交完MR程序之后&#xff0c;MR程序会先后经历map&#xff0c;reduce阶段&#xff0c;下面我们详细的来解析一下各个阶段 1、map阶段&#xff0c;在这个阶段主要分如下的几个步骤read&#xff…

MR的分片机制

分片机制 分片简介 Hadoop将MapReduce的MapReduce的输入数据划分为等长的小数据块&#xff0c;称之为输入分片(inputSpilt)或者简称“分片”Hadoop为为一个分片构建一个单独的map任务&#xff0c;并由该任务来运行用户自定义的map方法&#xff0c;从而处理分片的每一条数据分片…

MR 笔记四

1.MapReduce中的Combiner 1. Combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件 2. Combiner 组件的父类就是 Reducer 3. Combiner 和 reducer 的区别在于运行的位置 4. Combiner 是在每一个 maptask 所在的节点运行 ; 5. Combiner 的意义就是对每一个 maptask 的输…

MR 笔记二

1.Writable接口 在MR中使用对象 创建对象时需要实现Writable接口中的write()和readFields()方法 反序列化方法读顺序必须和写序列化方法的写顺序必须一致 2.MapTask运行机制详解 详细步骤&#xff1a; 1. 首先&#xff0c;读取数据组件 InputFormat &#xff08;默认 Te…

原根

定义 在数论&#xff0c;特别是整除理论中&#xff0c;原根是一个很重要的概念。 对于两个正整数 &#xff0c; 由欧拉定理可知&#xff0c;存在正整数&#xff0c; 比如说欧拉函数&#xff0c;即小于等于的正整数中与互素的正整数的个数&#xff0c;使得 。 由此&#xff0…

MR概述

一、概述 定义:MapReduce是一个分布式运算程序的编程框架。核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序&#xff0c;并发执行在一个Hadoop集群上。将原本的一个任务在一台节点上计算&#xff0c;变成了将一个任务分成多个task&#xff0…