MapReduce数据清理及案例

news/2024/12/29 3:06:46/

大数据竞赛知识点

文章目录

    • 大数据竞赛知识点
    • 一,Hive
      • 1,导入数据
      • 2,DDL(数据定义)增删改查
    • 二,数据文件解析
      • Json解析
      • GBK解析
      • 判空
      • 分区 (Partitioner)
      • 规约(Combiner)
      • 序列化和反序列化(implements Writable)
      • 排序(通过比较器(compareTo))
      • 计数器(Counter)
      • TopN
      • CSV文件忽略首行
      • 数据库操作
        • 读取数据库
          • 步骤:
            • step1:编写bean类
        • 写入数据库
            • step1:编写bean类 实现 Writable, DBWritable 接口
            • step2:根据数据库字段名重写write,readFields,write,readFields 方法
            • step3:
            • step4:
      • 2021样题思路及做法
        • 1,对手机信息做预处理
        • 2,分析得出手机销售量前三位的地区;
        • 3,分析出各品牌手机的市场占有率
        • 4,分析手机销售量前三位的品牌,这三个品牌中每个品牌销售量前三位的型号
        • 5,分析在手机品牌,手机型号,手机颜色,屏幕尺寸,CPU型号,电池,续行时间,运行内存,存储内存,销售价格等参数,最影响销量前三位的参数
      • 2020样题思路及做法
        • 1,剔除除了房源所在城市,房源地址,户型,面积,配套设施,租金,发布时间之外的其他所有附加信息;
        • 2,房源所在的城市,面积,租金等为空的记录对于后续的计算和分析基本没有帮助,也不可以用插值的方法修补租金额度,避免给后续的处理带来误导,请剔除租金为空的记录,并输出剔除的条目数量信息,截图并保存结果
        • 3,同一房源可能会出现多条记录,请把同一房源的多条记录进行合并,房源租金取多条记录的平均值,输出合并的条目数量,截图并保存结果;
        • 4,统计以城市为单位的所有房源的均价,输出结果并截取均价为前五的数据保存;
        • 5,统计以城市为单位的租房总面积,输出面积最大的前五个城市并截图保存结果;
        • 6,统计以房源城市,月份为单位的均价数据和房源数量数据,结果保存到mysql数据库中;
        • 7,使用方差来分析各个城市的放假分布情况,输出城市发展比较均衡的前五个城市输出并截图保存。
        • 8,以120平方为分界线,分成大面积房和小面积房。大面积房和小面积房的租房热度主要体现在平均单位面积租金上,若一个城市大面积住房的平均单位面积租金比小面积房的平均单位面积租金少,并且价格差距较大,则说明该城市的年轻人数量多,间接说明该城市的就业比较好,经济比较有活力。设计分析方法,并输出比较有活力的前五位城市。
          • step2
    • idea 快捷键
      • iter (增强型for循环)
      • ctrl + alt + v (生成返回值)
      • .var (生成返回值)
    • 注意的点
      • replace失效
      • 分割(空白 正则表达式)
      • 匹配非数字(空白 匹配非数字字符的字符)
      • Jar包上传 找不到主类
    • JAVA数据处理函数积累
        • startsWith() 如果字符串以指定的前缀开始
          • 参数
          • 返回值
        • String.valueOf() 将基本数据型态转换成 String 类型
        • DecimalFormat 格式化数据(小数点位数,科学计数法)
    • idea 快捷键
      • iter (增强型for循环)
      • ctrl + alt + v (生成返回值)
      • .var (生成返回值)
    • 注意的点
      • replace失效
      • 分割(空白 正则表达式)
      • 匹配非数字(空白 匹配非数字字符的字符)
      • Jar包上传 找不到主类
    • JAVA数据处理函数积累
        • startsWith() 如果字符串以指定的前缀开始
          • 参数
          • 返回值
        • String.valueOf() 将基本数据型态转换成 String 类型
        • DecimalFormat 格式化数据(小数点位数,科学计数法)

一,Hive

本质上是将HQL转化成MapReduce程序

1,导入数据

//本地数据
load data local inpath '数据路径' in table 表名(hive);
//hdfs数据
load data inpath '数据路径(hdfs)'into table 表名(hive);

2,DDL(数据定义)增删改查

// 创建数据库
hive > create database db_hive;
//创建数据表

二,数据文件解析

Json解析

案例:Test4–>CleanMap

Mapper文件:
JSONObject jsonObject = JSONObject.parseObject(line);得到数据使用:String[] data = new String[?];//生成数组存储数据data[0]=jsonObject.getString("json键名"); //得到数据并放进数组里存放pom文件依赖:
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.28</version></dependency>转换输出   
String end = "";
for (String item: data){end = end + item + "\t";}

GBK解析

// 防止乱码
String line = new String(value.getBytes(), 0, value.getLength(), "GBK");

判空

if (字段名==null || 字段名.trim().isEmpty() || 字段名.equals("")) {return;}

分区 (Partitioner)

案例:Test4–>CleanMap_Partitioner

Partitioner
作用:将MapReduce得出的结果根据某种分类方式分到不同的结果文件中,同一个订单分到同一个reduce

使用方法:
创建一个类去继承hadoop的Partitioner类,并重写getPartition方法。
需要在主方法中声明

job.setPartitionerClass("Partitioner类.class");//指定分区类,不指定就使用默认分区方法job.setNumReduceTasks(2);//指定多少个结果文件,根据你的分类方式进行分配,两种结果就写2,n种结果就写n
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;public class MyPartitioner extends Partitioner<Text, NullWritable>{@Overridepublic int getPartition(Text text, NullWritable nullWritable, int i) {String result = text.toString().split("\t")[5];if (Double.parseDouble(result) >= 100) {return 1;} else {return 0;}}}

规约(Combiner)

combiner相当于Reducer,是优化MapReduce的一种方式

combiner的意义是对每一个maptask的输出进行局部汇总,以减小网络传输量

实现步骤

​ 1,自定义一个combiner继承Reducer,重写reduce方法

​ 2,在job main中设置 job.setCombinerClass(CustomCombiner.class)

combiner能够应用的前提是不能影响最终的业务逻辑,而且,combiner的输出kv应该跟reducer的输入kv类型要对应起来

序列化和反序列化(implements Writable)

在MapReduce中创建对象类时(JavaBean)因为map与reducer之间需要网络连接,所以需要对对象属性序列化和反序列化。

实现步骤

​ 1,实现Writable接口,重写write(序列化方法),readFields(反序列化方法)

​ 2,在job main中设置 job.setOutputValueClass(对象类.class);

	public class FlowBean implements Writable {private Integer price; // 一个public Integer getPrice() {return price;}public void setPrice(Integer price) {this.price = price;}@Overridepublic String toString() {return  price + "";}// 序列化方法@Overridepublic void write(DataOutput dataOutput) throws IOException {// 有多少个变量就写多少个,需根据其数据类型dataOutput.writeInt(price);	// 一个}// 反序列化方法@Overridepublic void readFields(DataInput dataInput) throws IOException {// 有多少个变量就写多少个,需根据其数据类型this.price = dataInput.readInt();	// 一个}
}jobmain: 
job.setOutputValueClass(FlowBean.class);

排序(通过比较器(compareTo))

案例:Test4–>CleanMapSort

使用场景:需要同时比较两个以上的对象时

注:若为单独设计的比较器类,则需要在jobmain(主函数)中声明该比较器的类的名称

job.setSortComparatorClass(比较类名称.class);

使用步骤:

​ 1,创建一个对象类实现接口 WritableComparable<> 源码如下:

​ 2,重写compareTo方法(实现比较器,指定排序的规则)

@Public
@Stable
public interface WritableComparable<T> extends Writable, Comparable<T> {
}
// 实现比较器,指定排序的规则@Overridepublic int compareTo(SortBean sortBean) {// 先对城市h_city进行排序:city排序int result = this.h_city.compareTo(sortBean.h_city);		// 城市相同的根据h_price房价排序:price排序if(result == 0){return this.h_price - sortBean.h_price;}return result;}

计数器(Counter)

实现效果如下

MR_COUNTER删除的记录数为=108

使用步骤:

1,新建一个counter对象,并且指定计数类型和计数器名字

Counter counter = context.getCounter("MR_COUNTER", "删除的记录数为");,

2,在想要计数的区域,进行计数

counter.increment(1L);

案例:Clean_test4–>A–>map

该案例目标:将字段中为空的数据删除,并打印输出删除条目数

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] data = line.split("\t");Counter counter = context.getCounter("MR_COUNTER", "删除的记录数为");if (data[6].trim().isEmpty() || data[6].equals("NULL") || data[10].trim().isEmpty() || data[10].equals("NULL")|| data[11].trim().isEmpty() || data[11].equals("NULL")){counter.increment(1L);return;}context.write(new Text(line),new Text(""));
}

TopN

N的控制在reducer

int i = 0;for (Text value : values) {context.write(value,NullWritable.get());i++;if (i >= N){    // TopN所在break;}}

方法二:reduce----->Cleanup

Map<String,Double> map = new HashMap<>();map.put();Override
protected void cleanup(Context context) throws IOException, InterruptedException {Linkedlist<String,Double> list = new Linkedlist<>(map.entrySet());Collections.sort(list,new Comparator<Map.Entry<String,Double>>{@Overridepublic int compare(Map.Entry<String, Double> o1, Map.Entry<String, Double> o2) {return o1.getValue().compareTo(o2.getValue()) * -1;}
});for (int i = 0; i < 5; i++) {key.set(list.get(i).getKey();value.set(String.valueOf(list.get(i).getValue()));context.write(key,value);}

CSV文件忽略首行

根据首行偏移量为0,去掉首行

if (key.toString().equals("0")){return;
}

数据库操作

读取数据库

DBInputFormat类

  • DBInputFormat类用于从SQL表读取数据。底层一行一行读取表中的数据,返回<k,v>键值对

    其中k是LongWritable类型,表中数据的记录行号,从0开始

    其中v是DBWritable类型,表示该行数据对应的对象类型

DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver","jdbc:mysql://node1.itcase.cn:3306/itcast_shop","root","hadoop"
);
步骤:
step1:编写bean类

用于封装查询返回的结果(如果要查询表的所有字段,那么属性就跟表的字段一一对应即可)。

需要实现setter,getter,toString,构造方法。

需要继承序列化接口Writable,和数据库接口DBWritable

写入数据库

step1:编写bean类 实现 Writable, DBWritable 接口
step2:根据数据库字段名重写write,readFields,write,readFields 方法

数据库创建时建表语句添加 engine=innodb default charset=utf8

:防止中文字符乱码或者问号

step3:
DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver","jdbc:mysql://localhost:3306/dd2019?useUnicode=true&characterEncoding=utf8","root","passwd"
);
step4:
DBOutputFormat.setOutput(job,"table3_1",			:数据表"province", "city", "hotel_num", "room_num"		:插入数据字段名
);

2021样题思路及做法

根据“京东手机.txt”文件进行处理,部分数据如下

1.00E+13	Apple	Apple iPhone 11	"['白色', '绿色', '红色', '紫色', '黑色', '黄色']"	6.1英寸	CPU型号:其他	其他	其他	电池不可拆卸	1200万像素	1200万像素	Nano SIM	9月	4739	7.9万
67415794709	华为(HUAWEI)	荣耀Play4T	"['幻夜黑', '蓝水翡翠', '极光蓝']"	 	CPU型号:其他	6GB	128GB	电池不可拆卸	其他	其他	以官网信息为准	4月	1398	1.9万
1.00E+13	Apple	苹果 iPhone 11	"['红色', '绿色', '黄色', '黑色', '紫色', '白色']"	6.1英寸	CPU型号:其他	其他	128GB	电池不可拆卸	1200万像素	1200万像素	以官网信息为准	9月	4719	1万

1,对手机信息做预处理

​ 1)剔除掉含有无用数据的记录(比如数据为空,数据为乱码等);
​ 2)保留:手机品牌,手机型号,手机颜色,屏幕尺寸,CPU型号,电池,续行时间,运行内存,存储内存,销售地区,销售月份,销售价格等数据,其他数据删除;
​ 3)归并同一品牌的手机在同一个地区,同一个月份下的销售价格和销售量,归并的方法是取平均值。

思路:

一,创建bean.java类
将保留字段除品牌,地区,月份写入bean.java中,同时实现Writable接口

package Clean_testB;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class clean_testB_bean implements Writable //实现Writable接口,需要重写序列化和反序列化两个方法
{private String color;private String size;private String cpu;private String Battery;private Double sales_volume;private String Running_memory;private String Storage_memory;private Double price;public String getColor() {return color;}public void setColor(String color) {this.color = color;}public String getSize() {return size;}public void setSize(String size) {this.size = size;}public String getCpu() {return cpu;}public void setCpu(String cpu) {this.cpu = cpu;}public String getBattery() {return Battery;}public void setBattery(String battery) {Battery = battery;}public String getRunning_memory() {return Running_memory;}public void setRunning_memory(String running_memory) {Running_memory = running_memory;}public String getStorage_memory() {return Storage_memory;}public void setStorage_memory(String storage_memory) {Storage_memory = storage_memory;}public Double getPrice() {return price;}public void setPrice(Double price) {this.price = price;}public Double getSales_volume() {return sales_volume;}public void setSales_volume(double sales_volume) {this.sales_volume = sales_volume;}@Overridepublic String toString() {returncolor + "\t" +size + "\t" +cpu + "\t" +Battery + "\t" +Running_memory + "\t" +Storage_memory + "\t" +price + "\t" +sales_volume;}// 序列化@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(color);dataOutput.writeUTF(size);dataOutput.writeUTF(cpu);dataOutput.writeUTF(Battery);dataOutput.writeUTF(Running_memory);dataOutput.writeUTF(Storage_memory);dataOutput.writeDouble(sales_volume);dataOutput.writeDouble(price);}// 反序列化@Overridepublic void readFields(DataInput dataInput) throws IOException {this.color = dataInput.readUTF();this.size = dataInput.readUTF();this.cpu = dataInput.readUTF();this.Battery = dataInput.readUTF();this.Running_memory = dataInput.readUTF();this.Storage_memory = dataInput.readUTF();this.sales_volume = dataInput.readDouble();this.price = dataInput.readDouble();}
}

二,创建map类

package Clean_testB;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;//										K1		  V1   K2		V2
public class clean_testB_map extends Mapper<LongWritable, Text,Text, clean_testB_bean> {private static Text text = new Text();  // 创建text对象,优化程序@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line  = value.toString();	// 取一行数据String [] split = line.split("\t");		//以\t为分隔符进行切分String brand = split[1];			// 先取出3)中需要以其为目标归并的字段,分别为品牌,地区,月份String model = split[2];String month = split[12];for (String s : split) {				// 循环判空,if (s.equals("null")||s.trim().isEmpty()||s.equals("[]")){return;}}String sales_volume = split[14];	// 继续处理数据Double sales_volume_m = 0.0;		// 处理过程中需要转型为浮点类型String sales_volume_e = "";			// 最后转型回String类型// String类型的一个方法:contains,该方法是判断字符串中是否有子字符串。如果有则返回true,没有返回falseboolean status = sales_volume.contains("万");	if (status){sales_volume_m = Double.parseDouble(sales_volume.replace("万",""))*10000;//有“万”的删除“万”并乘以10000sales_volume_e = sales_volume_m.toString();}else {sales_volume_e = sales_volume; // 没有则直接赋值}clean_testB_bean ctb = new clean_testB_bean();		// 新建一个javabean对象ctb.setColor(split[3]);					// 赋值			ctb.setSize(split[4]);ctb.setCpu(split[5]);ctb.setBattery(split[8]);ctb.setRunning_memory(split[6]);ctb.setStorage_memory(split[7]);ctb.setPrice(Double.parseDouble(split[13]));ctb.setSales_volume(Double.parseDouble(sales_volume_e));text.set(brand+"\t"+model+"\t"+month);		// 归并字符作为k值,因为MapReduce自带的合并功能(相同K合并)context.write(text,ctb);					// 传给reduce}
}

三,创建reduce类

package Clean_testB;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
//											 K2           V2	    K3		  V3
public class clean_testB_reducer extends Reducer<Text, clean_testB_bean,Text,clean_testB_bean> {@Overrideprotected void reduce(Text key, Iterable<clean_testB_bean> values, Context context) throws IOException, InterruptedException {// 因为需要继续对数据进行处理,所有需要创建变量进行赋值double price_f = 0;double sales_volume_f = 0.0;double count = 0.0;		// 取平均值 计数器String color ="";String size ="";String cpu ="";String Battery ="";String Running_memory = "";String Storage_memory = "";// for循环遍历 快捷键:iterfor (clean_testB_bean value : values) {price_f += value.getPrice();		//根据3)要求3个字段相同的销售价格和销售量取 平均值sales_volume_f += value.getSales_volume();		// k值相同,销售量相加color = value.getColor();size = value.getSize();cpu = value.getCpu();Battery = value.getBattery();Running_memory = value.getRunning_memory();Storage_memory = value.getStorage_memory();count++;			// 循环一次,自加1}double print_avg = price_f/count;		// 计算价格平均值double sales_volume_avg = sales_volume_f/count;		// 计算销售量平均值clean_testB_bean ctb = new clean_testB_bean();		// 新建javabean对象ctb.setPrice(print_avg);						// 赋值ctb.setSales_volume(sales_volume_avg);ctb.setColor(color);ctb.setSize(size);ctb.setCpu(cpu);ctb.setBattery(Battery);ctb.setRunning_memory(Running_memory);ctb.setStorage_memory(Storage_memory);context.write(key,ctb);		}
}

四,main job类

package Clean_testB;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class clean_testB_main {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();		// 固定写法Job job = Job.getInstance(conf);job.setMapperClass(clean_testB_map.class);job.setReducerClass(clean_testB_reducer.class);job.setJarByClass(clean_testB_main.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(clean_testB_bean.class);Path in = new Path(args[0]);		// 上传虚拟机运行Path out = new Path(args[1]);FileInputFormat.addInputPath(job, in);FileOutputFormat.setOutputPath(job, out);job.submit();}
}

五,打包

格式:hadoop jar jar包名.jar 主类名(META-INF中查看) 输入文件hdfs路径 输出文件hdfs路径例:
hadoop jar A.jar Clean_testB.clean_testB_main /in /out

2,分析得出手机销售量前三位的地区;

思路:使用上一步手机信息预处理后的数据,做简单排序,取Top3

一,bean

package Clean_testB_1;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class Bean implements WritableComparable<Bean> {  // WritableComparable 序列化加排序private Double price;public double getPrice() {return price;}public void setPrice(double price) {this.price = price;}@Overridepublic String toString() {return  price + "";}// 序列化@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeDouble(price);}// 反序列化@Overridepublic void readFields(DataInput dataInput) throws IOException {this.price = dataInput.readDouble();}// compareTo方法用于比较,返回int类型,0代表相等@Overridepublic int compareTo(Bean bean) {int result = this.price.compareTo(bean.price) * -1;return result;}
}

二,map

package Clean_testB_1;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class map extends Mapper<LongWritable, Text,Text, Bean> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String []split = line.split("\t");String area = split[1];						// 取地区Bean bean = new Bean();bean.setPrice(Double.parseDouble(split[11]));  // 取销售量context.write(new Text(area),bean);}
}

三,reduce

package Clean_testB_1;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class reduce extends Reducer<Text,Bean,Text,Bean> {private static int i = 0;     // TopN关键@Overrideprotected void reduce(Text key, Iterable<Bean> values, Context context) throws IOException, InterruptedException {Double sum_price = 0.0;if (i < 3) {			// TopN关键for (Bean value : values) {sum_price += value.getPrice();}Bean bean = new Bean();bean.setPrice(sum_price);context.write(key, bean);i++;}}
}

四,main

package Clean_testB_1;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/**
*main方法与前类似,这里使用了本地模式】
*本地模式既是将输入输出地址更换成本地路径(本地模式需要在window安装hadoop)
**/
public class main {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(main.class);job.setMapperClass(map.class);job.setReducerClass(reduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Bean.class);FileInputFormat.addInputPath(job,new Path("F:\\TestData\\in\\outB\\b.txt"));FileOutputFormat.setOutputPath(job,new Path("F:\\TestData\\in\\outB\\out22"));System.exit(job.waitForCompletion(true)?0:1);}
}

3,分析出各品牌手机的市场占有率

市场占有率 = (所有手机的市场销量 / 该品牌的市场销量 )* %

思路:使用两个MR,第一个MR计算所有手机的市场销量

第一个MapReduce:

一,map

package Clean_testB_2_1;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class testB_2_1_map extends Mapper<LongWritable, Text,Text, Text> {private static Text text = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String []split = value.toString().split("\t");Double sales_volume = Double.parseDouble(split[10]);text.set("所有手机销售总和");context.write(text,new Text(String.valueOf(sales_volume)));}
}

二,reduce

package Clean_testB_2_1;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.math.BigDecimal;/***   k2         v2*   品牌        销量list*/
public class testB_2_1_reducer extends Reducer<Text,Text,Text,Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {double sales = 0.0;for (Text value : values) {sales += Double.parseDouble(String.valueOf(value));}String str_sales = new BigDecimal(sales+"").toString();// 如果出现数字变成科学计数法形式,可使用BigDecimal()context.write(key,new Text(str_sales));}
}

三,main

package Clean_testB_2_1;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class main {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(main.class);job.setMapperClass(testB_2_1_map.class);job.setReducerClass(testB_2_1_reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);Path in = new Path("F:\\TestData\\in\\outB\\test.txt");Path out = new Path("F:\\TestData\\in\\outB\\outB_31");FileInputFormat.addInputPath(job,in);FileOutputFormat.setOutputPath(job,out);System.exit(job.waitForCompletion(true)?0:1);}
}

第二个MapReduce:

将第一个MapReduce计算出来的销售总和放到第二个中

一,map

package Clean_testB_2_2.end;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/***   k2         v2*   品牌        销量list*/
public class testB_2_2_map extends Mapper<LongWritable, Text,Text, Text> {private static Text text = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String []split = value.toString().split("\t");String brand = split[0];Double sales_volume = Double.parseDouble(split[1]);text.set(brand);context.write(text,new Text(String.valueOf(sales_volume)));}
}

二,reduce

package Clean_testB_2_2.end;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.math.BigDecimal;/***   k2         v2*   品牌        销量list**   k3         v3*   品牌        (总销量/品牌数) * 100%*/
public class testB_2_2_reducer extends Reducer<Text,Text,Text,Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {double sales = 0.0;for (Text value : values) {sales += Double.parseDouble(String.valueOf(value)) / 10733905.40458202;}String str_sales = new BigDecimal(sales+"").toString();context.write(key,new Text(str_sales + "%"));}
}

三,main

package Clean_testB_2_2.end;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;public class main {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(main.class);job.setMapperClass(testB_2_2_map.class);job.setReducerClass(testB_2_2_reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);Path in = new Path("F:\\TestData\\in\\outB\\outB_2\\part-r-00000");Path out = new Path("F:\\TestData\\in\\outB\\outB_2\\outB_3\\endB");FileInputFormat.addInputPath(job,in);FileOutputFormat.setOutputPath(job,out);System.exit(job.waitForCompletion(true)?0:1);}
}

4,分析手机销售量前三位的品牌,这三个品牌中每个品牌销售量前三位的型号

分两个MR,第一个map以品牌为K值,计算出销售量,取前三位(代码省略)

第二个MR

一,map

package Clean_TopN_s;import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;public class TopNMapper extends Mapper<LongWritable, Text, TopNBean,Text> {private HashMap<String,String> map = new HashMap<>();@Overrideprotected void setup(Context context) throws IOException, InterruptedException {// 第一件事:将分布式缓存中的小表数据读取到本地map集合中// 1:获取分布式缓存文件列表URI[] cacheFiles = context.getCacheFiles();// 2:获取指定的分布式缓存文件的文件系统(FileSystem)FileSystem fileSystem = FileSystem.get(cacheFiles[0], context.getConfiguration());// 3:获取文件的输入流FSDataInputStream inputStream = fileSystem.open(new Path(cacheFiles[0]));// 4:读取文件内容,并将数据存入Map集合//4.1 将字节输入流转为字符缓冲流FSDataInputStream----->BufferedReaderBufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));//4.2 读取小表文件内容,以行为单位,并将读取的数据存入map集合String line = null;while((line = bufferedReader.readLine()) != null){map.put(line,line);}// 5:关闭流bufferedReader.close();fileSystem.close();}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {/*** 半自动,需要手动填写Top3品牌*//**// 拆分文本数据,得到城市和房价String [] split =  value.toString().split("\t");if (split[0].equals("Apple")||split[0].equals("华为(HUAWEI)")||split[0].equals("飞利浦(PHILIPS)")){// 封装topNBean 得到K2TopNBean topNBean = new TopNBean();//  topNBean.setH_city(split[0]);//  topNBean.setH_price(Integer.parseInt(split[7]));topNBean.setH_city(split[0]);topNBean.setH_price(Double.parseDouble(split[10]));// 将K2,V2写入上下文context.write(topNBean,value);}else {return;}**//*** 改进后,不需要手动填写品牌*/String[] split = value.toString().split("\t");String brandname = split[0]; // K2String brandLine = map.get(brandname);if (brandLine != null) {TopNBean topNBean = new TopNBean();topNBean.setH_city(split[0]);topNBean.setH_price(Double.parseDouble(split[10]));context.write(topNBean, new Text(value));}}
}

二,reduce

package Clean_TopN_s;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class TopNReducer extends Reducer<TopNBean, Text,Text, NullWritable> {@Overrideprotected void reduce(TopNBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {int i = 0;for (Text value : values) {context.write(value, NullWritable.get());i++;if (i >= 3){break;}}}
}

三,bean

package Clean_TopN_s;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class TopNBean implements WritableComparable<TopNBean> {private String h_city;
//    private Integer h_price;private Double h_price;public String getH_city() {return h_city;}public void setH_city(String h_city) {this.h_city = h_city;}public Double getH_price() {return h_price;}public void setH_price(Double h_price) {this.h_price = h_price;}@Overridepublic String toString() {return  h_city + '\t' +h_price ;}@Overridepublic int compareTo(TopNBean topNBean) {int i = this.h_city.compareTo(topNBean.h_city);if(i == 0){i = this.h_price.compareTo(topNBean.h_price) * -1;}return i;}@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(h_city);dataOutput.writeDouble(h_price);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.h_city = dataInput.readUTF();this.h_price = dataInput.readDouble();}
}

四,分区Partitioner

package Clean_TopN_s;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;public class TopNPartitioner extends Partitioner<TopNBean, Text> {// 分区规则 根据品牌进行分区@Overridepublic int getPartition(TopNBean topNBean, Text text, int i) {return (topNBean.getH_city().hashCode() & 2147483647) % i;}
}

五,分组Group

package Clean_TopN_s;import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;// 1:继承WriteableComparator
public class TopNGroupPartitioner extends WritableComparator {// 2:调用父类的有参构造public TopNGroupPartitioner() {super(TopNBean.class,true);}// 3:指定分组的规则(重写方法)@Overridepublic int compare(WritableComparable a, WritableComparable b) {// 3.1 对形参做强制类型转换TopNBean frist = (TopNBean)a;TopNBean second = (TopNBean)b;// 3.2 指定分组规则return frist.getH_city().compareTo(second.getH_city());}
}

六,main

package Clean_TopN_s;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class TopNMain {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(TopNMain.class);job.setMapperClass(TopNMapper.class);job.setReducerClass(TopNReducer.class);job.setPartitionerClass(TopNPartitioner.class);job.setGroupingComparatorClass(TopNGroupPartitioner.class);job.setMapOutputKeyClass(TopNBean.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);//        Path in = new Path("F:\\TestData\\out");
//        Path out = new Path("F:\\TestData\\outoutout");Path in = new Path("F:\\TestData\\in\\outB\\part-r-00000");Path out = new Path("F:\\TestData\\in\\outB\\outB_3");FileInputFormat.addInputPath(job, in);FileOutputFormat.setOutputPath(job, out);System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

5,分析在手机品牌,手机型号,手机颜色,屏幕尺寸,CPU型号,电池,续行时间,运行内存,存储内存,销售价格等参数,最影响销量前三位的参数

思路:使用wordcount,计算数据出现频次,选出出现次数top3的参数

一,map

package Clean_TopN_ss;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class map extends Mapper<LongWritable, Text,Text, IntWritable> {Text text = new Text();IntWritable intWritable = new IntWritable();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String []split = value.toString().split("\t");for (String s : split) {intWritable.set(1);text.set(s);context.write(text,intWritable);}}
}

二,reduce

package Clean_TopN_ss;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class reduce extends Reducer<Text, IntWritable,Text,IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int count = 0;for (IntWritable value : values) {count += value.get();}context.write(key,new IntWritable(count));}

2020样题思路及做法

数据使用house.json文件

部分数据如下

{"h_city": "北京", "h_url": "https://beijing.fangdd.com/zufang/2000000924373.html?SSR_JSON=false", "h_area": "朝-阳-区", "h_name": "华腾园", "h_type": "3室1厅", "h_size": "18㎡", "h_fac": "空调+床+书桌", "h_price": "3770", "h_pdate": "2020-11-04"}
{"h_city": "上海", "h_url": "https://shanghai.fangdd.com/zufang/4000000035998.html?SSR_JSON=false", "h_area": "青-浦", "h_name": "东方明珠花园", "h_type": "5室2厅", "h_size": "198㎡", "h_fac": "宽带+空调+洗衣机+冰箱+热水器+烤箱+微波炉+灶具+油烟机+电磁炉+沙发+电视+床+书桌+衣柜+阳台+独立阳台+独立卫生间+飘窗+暖气+天然气+橱柜+椅子", "h_price": "13000", "h_pdate": "2020-08-20"}

1,剔除除了房源所在城市,房源地址,户型,面积,配套设施,租金,发布时间之外的其他所有附加信息;

2,房源所在的城市,面积,租金等为空的记录对于后续的计算和分析基本没有帮助,也不可以用插值的方法修补租金额度,避免给后续的处理带来误导,请剔除租金为空的记录,并输出剔除的条目数量信息,截图并保存结果

Map

package Clean_MR;import com.alibaba.fastjson.JSONObject;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class Clean_map extends Mapper<LongWritable, Text,Text, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();JSONObject jsonObject = JSONObject.parseObject(line); // json解析String[] datas = new String[7];				datas[0] = jsonObject.getString("h_city");	// 取所需字段datas[1] = jsonObject.getString("h_name");datas[2] = jsonObject.getString("h_type");datas[3] = jsonObject.getString("h_size");datas[4] = jsonObject.getString("h_fac");datas[5] = jsonObject.getString("h_price");datas[6] = jsonObject.getString("h_pdate");Counter counter = context.getCounter("MR_COUNTER","删除的记录数为");	// counter计数for (String  data: datas) {							// 遍历字段里面有空值的列数并剔除if (data==null || data.trim().isEmpty() || data.equals("")){counter.increment(1L);		// counter加一return;					// MR_Counter 删除的记录数为=74}}String end = "";			// 剩余字段拼接成K值for (String data : datas) {end = end + data + "\t";}context.write(new Text(end),NullWritable.get());	// 输出K2}
}

reduce

package Clean_MR;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
/**
*  reduce端无需额外处理,直接输出K3,V3
**/
public class Clean_reduce extends Reducer<Text, NullWritable ,Text,NullWritable> {@Overrideprotected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {context.write(key,NullWritable.get());}
}

main

package Clean_MR;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class Clean_Job extends Reducer<Text, NullWritable ,Text,NullWritable> {@Overrideprotected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {context.write(key,NullWritable.get());}
}

3,同一房源可能会出现多条记录,请把同一房源的多条记录进行合并,房源租金取多条记录的平均值,输出合并的条目数量,截图并保存结果;

同一房源 合并 取平均 输出合并条目

map

package Clean_MR_step2;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class Clean_step2_map extends Mapper<LongWritable, Text,Text,Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] split = line.split("\t");StringBuffer stringBuffer = new StringBuffer();	// new一个stringBuffer数组for (String s : split) {if (s != split[5]){		// 除了房源租金字段不取外stringBuffer.append(s).append("\t");}}context.write(new Text(stringBuffer.toString()),new Text(split[5]));}
}

reduce

package Clean_MR_step2;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class Clean_step2_reduce extends Reducer<Text,Text,Text,Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {Counter counter = context.getCounter("MR_COUNTER","合并条目数");double sum = 0.0; 	// 计算K值相同 V值的总和double avg;int i = 0;		//	K相同条数for (Text value : values) {sum += Double.parseDouble(String.valueOf(value));i++;}if (i>=2){	 // 相同的数据大于2才需要合并  才需要加入合并条目数counter.increment(1L);}avg = sum / i;	// 多条记录平均值context.write(key,new Text(String.valueOf(avg)));	//输出K3,V3}}

main

package Clean_MR_step2;import Clean_MR.Clean_map;
import Clean_MR.Clean_reduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class Clean_step2_main {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(Clean_step2_main.class);job.setMapperClass(Clean_step2_map.class);job.setReducerClass(Clean_step2_reduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job,new Path("F:\\TestData\\in\\new\\part-r-00000"));FileOutputFormat.setOutputPath(job,new Path("F:\\TestData\\in\\new\\newout"));System.exit(job.waitForCompletion(true)?0:1);}
}

4,统计以城市为单位的所有房源的均价,输出结果并截取均价为前五的数据保存;

房租均价 城市 前五

map

package Clean_MR_step3;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class Clean_step3_map extends Mapper<LongWritable, Text,Text, Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] split = value.toString().split("\t");String city = split[0];		// 取城市String price = split[split.length-1];	// 租房价格context.write(new Text(city),new Text(price));}
}

reduce

package Clean_MR_step3;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.util.*;public class Clean_step3_reduce extends Reducer<Text, Text,Text,Text> {Map<String,Double> map= new HashMap<>();		//new 一个hash表,用于存放每个,单个reduce的结果@Overrideprotected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text,Text,Text>.Context context) {Double sum = 0.0;	// 求房租均价,需要先算出同一个城市的所有房租的总价int count = 0;		// 当前城市下,房间数量for (Text value : values) {sum += Double.parseDouble(String.valueOf(value));count++;}double avg = sum / count;		// 计算房租均值map.put(String.valueOf(key),avg);		// 将结果放入hash 表中}/***	重写cleanup方法,对reduce产生总的数据进行**/@Overrideprotected void cleanup(Context context) throws IOException, InterruptedException  {//这里将map.entrySet()转换成listList<Map.Entry<String,Double>> list = new LinkedList<>(map.entrySet());//通过比较器来实现排序Collections.sort(list,new Comparator<Map.Entry<String,Double>>() {//降序排序@Overridepublic int compare(Map.Entry<String, Double> o1, Map.Entry<String, Double> o2) {return o1.getValue().compareTo(o2.getValue()) * -1;	}});for(int i=0;i<5;i++){	// 取前五context.write(new Text(list.get(i).getKey()), new Text(String.valueOf(list.get(i).getValue())));}}}

main

package Clean_MR_step3;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class Clean_step3_main {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(Clean_step3_main .class);job.setMapperClass(Clean_step3_map .class);job.setReducerClass(Clean_step3_reduce .class);job.setOutputKeyClass(Text .class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job,new Path("F:\\TestData\\in\\new\\newout\\part-r-00000"));
//        FileInputFormat.addInputPath(job,new Path("F:\\TestData\\in\\new\\newout\\test.txt"));FileOutputFormat.setOutputPath(job,new Path("F:\\TestData\\in\\new\\newout\\out"));System.exit(job.waitForCompletion(true)?0:1);}}

5,统计以城市为单位的租房总面积,输出面积最大的前五个城市并截图保存结果;

以城市为单位 租房总面积 前五

map

package Clean_MR_step4;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class Clean_step4_map extends Mapper<LongWritable, Text,Text,Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] split = line.split("\t");Double area = Double.parseDouble(split[3].replace("㎡",""));		// 取面积,数据处理去除㎡String city = split[0];		// 取面积context.write(new Text(city),new Text(area.toString()));}
}

reduce

package Clean_MR_step4;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.util.*;public class Clean_step4_reduce extends Reducer<Text,Text,Text,Text> {Map<String,Double>map = new HashMap<>();		 // 创建一个hash表  存放数据用于cleanup排序@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {Double sum = 0.0;for (Text value : values) {sum += Double.parseDouble(String.valueOf(value));}map.put(String.valueOf(key),sum);}@Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {List<Map.Entry<String,Double>> list = new LinkedList<>(map.entrySet());Collections.sort(list, new Comparator<Map.Entry<String, Double>>() {@Overridepublic int compare(Map.Entry<String, Double> o1, Map.Entry<String, Double> o2) {return o1.getValue().compareTo(o2.getValue()) * -1;}});for(int i=0;i<5;i++){context.write(new Text(list.get(i).getKey()),new Text(String.valueOf(list.get(i).getValue())));}}
}

main

package Clean_MR_step4;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class Clean_step4_main {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(Clean_step4_main.class);job.setMapperClass(Clean_step4_map.class);job.setReducerClass(Clean_step4_reduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);//        FileInputFormat.addInputPath(job,new Path("F:\\TestData\\in\\new\\newout\\test.txt"));FileInputFormat.addInputPath(job,new Path("F:\\TestData\\in\\new\\newout\\part-r-00000"));FileOutputFormat.setOutputPath(job,new Path("F:\\TestData\\in\\new\\newout\\out5"));System.exit(job.waitForCompletion(true)?0:1);}
}

6,统计以房源城市,月份为单位的均价数据和房源数量数据,结果保存到mysql数据库中;

(房源城市 月份) 均价 房源数量

map

package Clean_MR_step5;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;public class Clean_step5_map extends Mapper<LongWritable, Text,Text,Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] split = value.toString().split("\t");// 字符串substring提取String city = split[0];String month = String.valueOf(split[5]).substring(5,7);// 正则表达式
//        String pattern = "-(.*?)-";
//        Pattern r = Pattern.compile(pattern);
//        Matcher m = r.matcher(split[5]);
//        String month = m.group(1);String price = split[7];context.write(new Text(city+ "\t"+month),new Text(price));}
}

reduce

package Clean_MR_step5;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.text.DecimalFormat;public class Clean_step5_reduce extends Reducer<Text, Text,Text,Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {Double sum = 0.0;int beichushu = 0;for (Text value : values) {sum += Double.parseDouble(String.valueOf(value));beichushu++;}DecimalFormat df = new DecimalFormat("#.000000");	//	保留6位小数double end = Double.parseDouble(df.format(sum / beichushu));context.write(key,new Text(end +"\t"+beichushu));}
}

main

7,使用方差来分析各个城市的放假分布情况,输出城市发展比较均衡的前五个城市输出并截图保存。

方差公式:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-g3vrQoru-1624806513021)(2020-12-28 2020-12-23 大数据竞赛知识点 141656.assets/image-20210331155115605.png)]

map

package Clean_MR_step6;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class Clean_step6_map extends Mapper<LongWritable, Text,Text,Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] split = value.toString().split("\t");String city = split[0];String price = split[7];context.write(new Text(city),new Text(price));}
}

reduce

package Clean_MR_step6;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.util.*;public class Clean_step6_reduce extends Reducer<Text,Text,Text,Text> {private ArrayList<Double> lengths = new ArrayList<Double>();	// 存放values遍历出来的valueMap<String,Double>map = new HashMap<>();					// 因为对reduce遍历只能遍历一次,所以															想要再次利用value值只能将其放在新建的一个															数组里面@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {Double sum = 0.0;int count = 0;Double var = 0.0;for (Text value : values) {		// 只能遍历一次sum += Double.parseDouble(String.valueOf(value));count++;lengths.add(Double.parseDouble(String.valueOf(value)));		// 将value放入arraylist中}Collections.sort(lengths);// 求平均值Double mean = sum / count;// 求方差for (Double value2 : lengths) {
//            var = ((value2 - mean) * (value2 - mean) )/ count;var += (value2 - mean) * (value2 - mean)/ count;}map.put(String.valueOf(key),var);}@Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {List<Map.Entry<String,Double>> list = new LinkedList<>(map.entrySet());Collections.sort(list, new Comparator<Map.Entry<String, Double>>() {@Overridepublic int compare(Map.Entry<String, Double> o1, Map.Entry<String, Double> o2) {return o1.getValue().compareTo(o2.getValue());}});for(int i=0;i<5;i++){context.write(new Text(list.get(i).getKey()),new Text(String.valueOf(list.get(i).getValue())));}}
}

main

package Clean_MR_step6;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class Clean_step6_main {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(Clean_step6_main.class);job.setMapperClass(Clean_step6_map.class);job.setReducerClass(Clean_step6_reduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);//        FileInputFormat.addInputPath(job,new Path("F:\\TestData\\in\\new\\newout\\test6.txt"));FileInputFormat.addInputPath(job,new Path("F:\\TestData\\in\\new\\newout\\part-r-00000"));FileOutputFormat.setOutputPath(job,new Path("F:\\TestData\\in\\new\\newout\\out7"));System.exit(job.waitForCompletion(true)?0:1);}}

8,以120平方为分界线,分成大面积房和小面积房。大面积房和小面积房的租房热度主要体现在平均单位面积租金上,若一个城市大面积住房的平均单位面积租金比小面积房的平均单位面积租金少,并且价格差距较大,则说明该城市的年轻人数量多,间接说明该城市的就业比较好,经济比较有活力。设计分析方法,并输出比较有活力的前五位城市。

大面积房>120 小面积房小于120 平均单位面积—>租金除以面积
活力–>大面积房平均单位面积 与 小面积房平均单位面积差值

分成两个MapReduce写

bean

package Clean_MR_step7;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class Clean_step7_bean implements Writable {private Double area;private Double rent;public Double getArea() {return area;}public void setArea(Double area) {this.area = area;}public Double getRent() {return rent;}public void setRent(Double rent) {this.rent = rent;}@Overridepublic String toString() {returnarea +"\t"+rent;}@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeDouble(area);dataOutput.writeDouble(rent);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.area = dataInput.readDouble();this.rent = dataInput.readDouble();}
}

map

package Clean_MR_step7;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class Clean_step7_map extends Mapper<LongWritable, Text,Text,Clean_step7_bean> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] split = value.toString().split("\t");String city = split[0];Clean_step7_bean cs7b = new Clean_step7_bean();cs7b.setArea(Double.parseDouble(split[3].replace("㎡","")));			// 取面积放入bean对象中cs7b.setRent(Double.parseDouble(split[7]));						// 取租金放入bean对象中context.write(new Text(city),cs7b);	// 城市,(面积,租金)}
}

partitioner

package Clean_MR_step7;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;public class Clean_step7_partitioner extends Partitioner<Text,Clean_step7_bean> {@Overridepublic int getPartition(Text text, Clean_step7_bean clean_step7_bean, int i) {Double numStr = clean_step7_bean.getArea();if (numStr > 120){		// 分两个结果		面积大于120 与 面积小于120return 1;}else {return 0;}}
}

reduce

package Clean_MR_step7;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class Clean_step7_reduce extends Reducer<Text,Clean_step7_bean,Text,Text> {@Overrideprotected void reduce(Text key, Iterable<Clean_step7_bean> values, Context context) throws IOException, InterruptedException {Double sum_area = 0.0;Double sum_rent = 0.0;Double avg;for (Clean_step7_bean value : values) {	// 同城市	面积	租金	相加sum_area += value.getArea();sum_rent += value.getRent();}avg = sum_rent / sum_area;		// 平均单位面积context.write(key,new Text(String.valueOf(avg)));}
}

main

package Clean_MR_step7;import Clean_MR_step4.Clean_step4_main;
import Clean_MR_step4.Clean_step4_map;
import Clean_MR_step4.Clean_step4_reduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class Clean_step7_main {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(Clean_step7_main.class);job.setMapperClass(Clean_step7_map.class);job.setReducerClass(Clean_step7_reduce.class);job.setPartitionerClass(Clean_step7_partitioner.class);		// 指定分区类job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Clean_step7_bean.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setNumReduceTasks(2);		// 结果文件数量//        FileInputFormat.addInputPath(job,new Path("F:\\TestData\\in\\new\\newout\\test.txt"));FileInputFormat.addInputPath(job,new Path("F:\\TestData\\in\\new\\newout\\part-r-00000"));FileOutputFormat.setOutputPath(job,new Path("F:\\TestData\\in\\new\\newout\\out8"));System.exit(job.waitForCompletion(true)?0:1);}
}
step2

map

package Clean_MR_step7_step2;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;public class Clean_step7_2_map extends Mapper<LongWritable, Text,Text,Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] split = value.toString().split("\t");context.write(new Text(split[0]),new Text(split[1]));}
}

reduce

package Clean_MR_step7_step2;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.util.*;public class Clean_step7_2_reduce extends Reducer<Text,Text,Text,Text> {List<Double> list = new ArrayList<Double>();Map<String,Double> map= new HashMap<String,Double>();private static int n = 0;@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {Double D_value = 0.0;int count = 0;for (Text value : values) {list.add(Double.parseDouble(String.valueOf(value)));count++;}if (count==2){		//在step1中将结果分成了大于120和小于120两个文件,从中各取一个(同城市)进行判断数量是否等于2D_value = Math.abs(list.get(0+n)-list.get(1+n));	// 取绝对值,因为放在了list中,list在函数之外,所以会															不断累加,所以取值的时候需要加上nn = n+2;		// 两个就加2}else if (count==1){	// 不能保证数据里有大于120和小于120都有的,可能只有大于120或者小于120其中一个n++;				// 只有一个的时候只能加1return;}map.put(String.valueOf(key),D_value);}// cleanup排序 取top5@Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {List<Map.Entry<String, Double>> list = new LinkedList<>(map.entrySet());Collections.sort(list, new Comparator<Map.Entry<String, Double>>() {@Overridepublic int compare(Map.Entry<String, Double> o1, Map.Entry<String, Double> o2) {return o1.getValue().compareTo(o2.getValue());}});for(int i=0;i<5;i++){context.write(new Text(list.get(i).getKey()),new Text(String.valueOf(list.get(i).getValue())));}}
}

idea 快捷键

iter (增强型for循环)

for (FlowBean value : values) {}

ctrl + alt + v (生成返回值)

.var (生成返回值)

注意的点

replace失效

​ replace需要返回值

分割(空白 正则表达式)

split(\\s+) \\s表示 空格,回车,换行等空白符,
+号表示一个或多个的意思也就是说它是按空白部分进行拆分,不管这个空白使用设么操作留下
的,提如空格键 tab键

匹配非数字(空白 匹配非数字字符的字符)

大写表示“非”,d表示digit数字。非数字就是\D

Jar包上传 找不到主类

Exception in thread "main" java.lang.ClassNotFoundException:

解决办法:

​ 查看idea程序META-INF目录下的MANIFEST.MF文件

Manifest-Version: 1.0
Main-Class: Clean_2020_three.Merge_main

命令修改为

hadoop jar jar包地址 主类全限定名称 输入数据位置 输出结果位置

JAVA数据处理函数积累

startsWith() 如果字符串以指定的前缀开始

参数
  • prefix – 前缀。
  • toffset – 字符串中开始查找的位置。
返回值

如果字符串以指定的前缀开始,则返回 true;否则返回 false。

String.valueOf() 将基本数据型态转换成 String 类型

DecimalFormat 格式化数据(小数点位数,科学计数法)

double` `pi = ``3.1415927``; ``//圆周率``//取一位整数System.out.println(``new` `DecimalFormat(``"0"``).format(pi));   ``//3``//取一位整数和两位小数System.out.println(``new` `DecimalFormat(``"0.00"``).format(pi)); ``//3.14``//取两位整数和三位小数,整数不足部分以0填补。System.out.println(``new` `DecimalFormat(``"00.000"``).format(pi));``// 03.142``//取所有整数部分System.out.println(``new` `DecimalFormat(``"#"``).format(pi));   ``//3``//以百分比方式计数,并取两位小数System.out.println(``new` `DecimalFormat(``"#.##%"``).format(pi)); ``//314.16%``long` `c =``299792458``;  ``//光速``//显示为科学计数法,并取五位小数System.out.println(``new` `DecimalFormat(``"#.#####E0"``).format(c)); ``//2.99792E8``//显示为两位整数的科学计数法,并取四位小数System.out.println(``new` `DecimalFormat(``"00.####E0"``).format(c)); ``//29.9792E7``//每三位以逗号进行分隔。System.out.println(``new` `DecimalFormat(``",###"``).format(c));   ``//299,792,458``//将格式嵌入文本System.out.println(``new` `DecimalFormat(``"光速大小为每秒,###米。"``).format(c));
;}});for(int i=0;i<5;i++){context.write(new Text(list.get(i).getKey()),new Text(String.valueOf(list.get(i).getValue())));}}
}

idea 快捷键

iter (增强型for循环)

for (FlowBean value : values) {}

ctrl + alt + v (生成返回值)

.var (生成返回值)

注意的点

replace失效

​ replace需要返回值

分割(空白 正则表达式)

split(\\s+) \\s表示 空格,回车,换行等空白符,
+号表示一个或多个的意思也就是说它是按空白部分进行拆分,不管这个空白使用设么操作留下
的,提如空格键 tab键

匹配非数字(空白 匹配非数字字符的字符)

大写表示“非”,d表示digit数字。非数字就是\D

Jar包上传 找不到主类

Exception in thread "main" java.lang.ClassNotFoundException:

解决办法:

​ 查看idea程序META-INF目录下的MANIFEST.MF文件

Manifest-Version: 1.0
Main-Class: Clean_2020_three.Merge_main

命令修改为

hadoop jar jar包地址 主类全限定名称 输入数据位置 输出结果位置

JAVA数据处理函数积累

startsWith() 如果字符串以指定的前缀开始

参数
  • prefix – 前缀。
  • toffset – 字符串中开始查找的位置。
返回值

如果字符串以指定的前缀开始,则返回 true;否则返回 false。

String.valueOf() 将基本数据型态转换成 String 类型

DecimalFormat 格式化数据(小数点位数,科学计数法)

double` `pi = ``3.1415927``; ``//圆周率``//取一位整数System.out.println(``new` `DecimalFormat(``"0"``).format(pi));   ``//3``//取一位整数和两位小数System.out.println(``new` `DecimalFormat(``"0.00"``).format(pi)); ``//3.14``//取两位整数和三位小数,整数不足部分以0填补。System.out.println(``new` `DecimalFormat(``"00.000"``).format(pi));``// 03.142``//取所有整数部分System.out.println(``new` `DecimalFormat(``"#"``).format(pi));   ``//3``//以百分比方式计数,并取两位小数System.out.println(``new` `DecimalFormat(``"#.##%"``).format(pi)); ``//314.16%``long` `c =``299792458``;  ``//光速``//显示为科学计数法,并取五位小数System.out.println(``new` `DecimalFormat(``"#.#####E0"``).format(c)); ``//2.99792E8``//显示为两位整数的科学计数法,并取四位小数System.out.println(``new` `DecimalFormat(``"00.####E0"``).format(c)); ``//29.9792E7``//每三位以逗号进行分隔。System.out.println(``new` `DecimalFormat(``",###"``).format(c));   ``//299,792,458``//将格式嵌入文本System.out.println(``new` `DecimalFormat(``"光速大小为每秒,###米。"``).format(c));

http://www.ppmy.cn/news/270392.html

相关文章

LED数显驱动芯片,LED数码管驱动芯片VK1650,LED驱动控制IC,键盘扫描集成电路,内置RC震荡电路

VINKA/永嘉微电LED显示屏驱动主要应用于以下&#xff1a; 1&#xff1a;VCR、VCD、DVD 及家庭影院等显示屏驱动。 2&#xff1a;电磁炉、微波炉、冰箱、空调 、家庭影院等高段位显示屏驱动。 3&#xff1a;LED显示屏驱动&#xff0c;电子秤及小家电显示屏驱动。 4&#xf…

LED驱动IC/LED数显、数码管显示驱动芯片VK1651,带键盘扫描功能,内置上电复位电路和RC震荡电路,串行接口(CLK , DIO)

产品品牌&#xff1a;永嘉微电/VINKA 工程服务 技术支持 型号&#xff1a;VK1651 封装&#xff1a;DIP16/SOP16 年份&#xff1a;新年份 概述&#xff1a; VK1651是一种带键盘扫描接口的LED&#xff08;发光二极管显示器&#xff09;驱动控制专用电路&#xff0c;内部集…

用粉红噪声煲机_虾米歌单 | 【科学煲耳机】(白噪音 粉红噪音 无损) - 虾米音乐...

【科学煲耳机】(白噪音 粉红噪音 无损) 慢煲出好声唯一普遍适用的煲耳机方法是“渐进”&#xff0c;刚开始用轻柔一些的音乐&#xff0c;在较低音量下让耳机先舒缓10-30小时&#xff0c;然后用普通的音乐(摇滚、舞曲除外)在中等音量状态煲100-200小时&#xff1b;如果这时你听着…

整理2020智能车竞赛网站各分赛区报名情况

作者:卓晴博士&#xff0c;清华大学自动化系 全国大学生智能车竞赛秘书处 2020-07-25 Saturday □ 东北赛区 序号学校队伍组别联系电话1大连理工大学大连理工大学四轮组四轮组153059129942沈阳航空航天大学奇迹双车组接力组188932105123辽宁科技大学loveliveAI电磁组1556626186…

常用笔记本电脑检测软件(含下载)

如今&#xff0c;购买笔记本电脑已经很普遍&#xff0c;但很多朋友因购买到劣质笔记本电脑而懊恼&#xff0c;我明天要去买二手笔记本电脑&#xff0c;为了避免上当&#xff0c;在网上好不容易找到一些相关工具&#xff0c;希望对大家有帮助。 本文转载自&#xff1a; 重庆二手…

验机软件大集合(CPU、硬盘、显卡、系统检测类有更新)[转自it168]

来源 http://benyouhui.it168.com/viewthread.php?tid277506 综合类 验机软件Everest_Pro_1.51汉化版&#xff08;强力推荐&#xff0c;好东西&#xff0c;5月23日加入2006测试版&#xff09; http://benyouhui.it168.com/viewthread.php?tid274945 Windows操作系统下的硬件检…

电脑配置检测软件下载

1、EVEREST Home 2.00.327 Beta&#xff08;本人置顶推荐的检测软件&#xff09; 说明&#xff1a;EVEREST&#xff08;原名AIDA32&#xff09;一个测试软硬件系统信息的工具&#xff0c;它可以详细的显示出PC每一个方面的信息。支持上千种(3400)主板&#xff0c;支持上百种(36…

常用装机检测软件

------------------ 一、综合性检测分析 ------------------ 1、EVEREST Home Edition 2.50&#xff08;本人置顶推荐的检测软件&#xff09; 说明&#xff1a;EVEREST&#xff08;原名AIDA32&#xff09;一个测试软硬件系统信息的工具&#xff0c;它可以详细的显示出PC每一…