一、实验描述
根据数据文件phone_data.txt按照如下需求:
1)统计每一个手机号耗费的总上行流量、下行流量、总流量
2)将统计结果按照手机归属地不同号段(手机号前3位)输出到不同文件中
3)根据需求1)产生的结果再次对总流量进行排序。
4)按照要求2)每个手机号段输出的文件中按照总流量内部排序。
数据:
参看 phone_data.txt 文件
输入数据格式:
输出数据格式:
二、实验分析
(1) 由于需要统计每一个手机号耗费的总上行流量、下行流量、总流量,故可以将手机号当作键、其上行流量与上行流量以及总流量构成的Array作为值保存在Map中。通过读取文件并遍历每一行,为Map添加键值或更新键的值。然后遍历Map将统计结果写入文件。
(2) 在(1)的基础上需要按照手机归属地不同号段(手机号前3位)输出到不同文件中,这一需求可以通过读取(1)的统计结果文件并遍历行,将每行的前三个字符作为键、行字符串构成的Array作为值保存在Map中,在遍历过程中添加键值或更新值(Array新增一个行字符串)。然后遍历Map,以键为文件名,并遍历值作为文件的内容。
(3) 在(1)的基础上需要按总流量排序,这一需求可以通过读取(1)的统计结果文件并遍历行,将行字符串作为Tuple的第一个元素、总流量作为Tuple的第二个元素保存在Array中,然后通过Array的sortBy方法排序并遍历排序结果同时写入文件中。
(4) 在(1)的基础上需要按照要求(2)每个手机号段输出的文件中按照总流量内部排序,这个需求其实是(2)、(3)的结合。这一需求可以通过读取(1)的统计结果文件并遍历行,将每行的前三个字符作为键、行字符串构成的Array作为值保存在Map中,在遍历过程中添加键值或更新值(Array新增一个行字符串)。然后遍历Map,以键为文件名构建PrintWriter对象,同时遍历值,把值的各个元素作为Tuple的第一个元素、并从各个元素分割总流量作为Tuple的第二个元素保存在Array中,通过Array的sortBy方法排序并遍历排序结果,同时通过构建的PrintWriter对象写入文件中。
三、实验代码
(1) PhoneFlowSum.scala
package cn.edu.neu.Question_1import java.io.{File, PrintWriter}import scala.io.Sourceobject PhoneFlowSum {def main(args: Array[String]): Unit ={val file = Source.fromFile("src/main/resources/phone_data.txt")val lines = file.getLines()val phoneFlowMap = scala.collection.mutable.Map[String, Array[Int]]()lines.foreach(line => {val strs = line.split(" {4}")if(phoneFlowMap.contains(strs(1))){val newFlow = Array(strs(8).toInt, strs(9).toInt, strs(8).toInt+strs(9).toInt)var i = 0for (elem <- newFlow) {phoneFlowMap(strs(1))(i) += elemi += 1}}else{phoneFlowMap+=(strs(1) -> Array(strs(8).toInt, strs(9).toInt, strs(8).toInt+strs(9).toInt))}})val out = new PrintWriter("src/main/resources/question_1.txt")for ((k, v) <- phoneFlowMap) {out.print(k+"\t")v.foreach(flow => out.print(flow+"\t"))out.println()out.flush()}out.close()}
}
(2) PhoneFlowSumPartition.scala
package cn.edu.neu.Question_2import java.io.PrintWriterimport scala.io.Sourceobject PhoneFlowSumPartition {def main(args: Array[String]): Unit ={val file = Source.fromFile("src/main/resources/question_1.txt")val lines = file.getLines()val partitions = scala.collection.mutable.Map[String, Array[String]]()lines.foreach(line => {if(partitions.contains(line.substring(0,3))){partitions.update(line.substring(0,3),partitions(line.substring(0,3)):+line)}else{partitions+=(line.substring(0,3) -> Array(line))}})for ((k, v) <- partitions) {val out = new PrintWriter("src/main/resources/question_2/"+k+".txt")v.foreach(phoneFlow => out.println(phoneFlow))out.flush()out.close()}}
}
(3) PhoneFlowSort.scala
package cn.edu.neu.Question_3import java.io.PrintWriterimport scala.io.Sourceobject PhoneFlowSumSort {def main(args: Array[String]): Unit ={val file = Source.fromFile("src/main/resources/question_1.txt")val lines = file.getLines()val flowArrs = new Array[(String, Int)](20)var i = 0for (elem <- lines) {println(elem)val strs = elem.split("\t")flowArrs(i)=(elem, strs(3).toInt)i += 1}val sortFlow = flowArrs.sortBy(flow => flow._2)// println(sortFlow.getClass)val out = new PrintWriter("src/main/resources/question_3.txt");sortFlow.foreach(elem => {out.println(elem._1);out.flush()})out.close()}
}
(4) PhoneFlowSumPartitionSort.scala
package cn.edu.neu.Question_4import java.io.PrintWriterimport scala.io.Sourceobject PhoneFlowSumPartitionSort {def main(args: Array[String]): Unit ={val file = Source.fromFile("src/main/resources/question_1.txt")val lines = file.getLines()val partitions = scala.collection.mutable.Map[String, Array[String]]()lines.foreach(line => {if(partitions.contains(line.substring(0,3))){partitions.update(line.substring(0,3),partitions(line.substring(0,3)):+line)}else{partitions+=(line.substring(0,3) -> Array(line))}})for ((k, v) <- partitions) {val flowArrs = new Array[(String, Int)](v.length)val out = new PrintWriter("src/main/resources/question_4/"+k+".txt")var i = 0for (elem <- v) {val strs = elem.split("\t")flowArrs(i) = (elem, strs(3).toInt)i += 1}val sortFlow = flowArrs.sortBy(flow => flow._2)(Ordering.Int.reverse)for (elem <- sortFlow) {out.println(elem._1);out.flush()}out.close()}}
}