目录
目标
历史热门电影统计
最近热门电影统计
电影平均得分统计
每个类别优质电影统计
完整代码
目标
从 MongoDB 中加载数据,将【电影平均评分统计】、【电影评分个数统计】、【最近电影评分个数统计】三个统计算法进行运行实现,并将计算结果回写到 MongoDB 中。
历史热门电影统计
根据所有历史评分数据,计算历史评分次数最多的电影。
select mid, count(mid) as count from ratings group by mid;
//1. 历史热门统计,历史评分数最多val historicalHotMoviesDF = spark.sql("select mid, count(mid) as count " +"from ratings " +"group by mid")
最近热门电影统计
根据评分,按月为单位计算最近时间的月份里面评分数最多的电影集合。
自定义时间戳=>年月格式UDF。
spark.udf.register("changeDate",(x:Int)=>simpleDateFormat.format(new Date(x * 1000L)).toInt)
转换时间格式SQL。
select mid, score, changeDate(timestamp) as yearmonth from ratings;
select mid, count(mid) as count ,yearmonth from ratingOfMonth group by yearmonth,mid;
//2. 近期热门统计,按时间戳// 创建一个日期格式化工具val simpleDateFormat = new SimpleDateFormat("yyyyMM")// 注册UDF,将时间戳转换成年月格式//TODO new Data() 需要 毫秒spark.udf.register("changeDate", (x: Int) => simpleDateFormat.format(new Date(x * 1000L)).toInt)// 预处理去掉uid,转换时间val ratingOfYearMonth = spark.sql("select mid, score, changeDate(timestamp) as yearmonth " +"from ratings")ratingOfYearMonth.createOrReplaceTempView("ratingOfMonth")val recentlyHotMovieDF = spark.sql("select mid, count(mid) as count, yearmonth from ratingOfMonth group by yearmonth, mid order by yearmonth desc, count desc")storeDFInMongoDB(recentlyHotMovieDF, RECENTLY_HOT_MOVIES)
电影平均得分统计
根据历史数据中所有用户对电影的评分,周期性的计算每个电影的平均得分。
select mid, avg(score) as avg from ratings group by mid
val averageMoviesDF = spark.sql("select mid, avg(score) as avg from ratings group by mid")storeDFInMongoDB(averageMoviesDF, AVERAGE_MOVIES)
每个类别优质电影统计
根据提供的所有电影类别,分别计算每种类型的电影集合中评分最高的10 个电影。
在计算完整个电影的平均得分之后,将影片集合与电影类型做笛卡尔积,然后过滤掉电影类型不符合的条目。
核心代码
val genresTopMoviesDF = genresRDD.cartesian(movieWithScore.rdd).filter{case (genre, row) => row.getAs[String]("genres").toLowerCase.contains( genre.toLowerCase() )}.map{case (genre, row) => (genre, ( row.getAs[Int]("mid"), row.getAs[Double]("avg")))}.groupByKey().map{case (genres, items) => GenresRecommendation(genres, items.toList.sortWith(_._2 > _._2).take(10).map(item => Recommendation(item._1, item._2)))}.toDF
完整代码
package com.statistiscimport java.util.Dateimport org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}import java.text.SimpleDateFormat/*** 离线统计推荐* 1. 历史热门* 2. 最近热门* 3. 电影评分* 4. 每个类别电影的评分前十电影统计*/case class Movie(mid: Int, name: String, descri: String, timelong: String, issue: String,shoot: String, language: String, genres: String, actors: String, directors: String)
case class Rating(uid: Int, mid: Int, score: Double, timestamp: Int)case class MongoConfig(uri: String, db: String)//定义电影类别评分top10样例类
//基准推荐对象
case class Recommendation(mid: Int, score: Double)case class GenresRecommendation(genres: String, recs: Seq[Recommendation]) //类别, top10基准推荐对象object statisticsRecommender {val MONGODB_MOVIE_COLLECTION = "Movie"val MONGODB_RATING_COLLECTION = "Rating"//存储表val HISTORICAL_HOT_MOVIES = "HistoricalHotMovies"val RECENTLY_HOT_MOVIES = "RecentlyHotMovies"val AVERAGE_MOVIES = "AverageMovies"val GENRES_TOP_MOVIES = "GenresTopMovies"def main(args: Array[String]): Unit = {val config = Map("spark.cores" -> "local[*]","mongo.uri" -> "mongodb://hadoop100:27017/recommender","mongo.db" -> "recommender")//创建一个sparkConfval sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("StatisticsRecommeder")//创建一个SparkSessionval spark = SparkSession.builder().config(sparkConf).getOrCreate()import spark.implicits._implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))/*** 从mongodb加载数据* 1. 创建连接,标准的连接mongodb* 2. spark快速读取*/val ratingDF = spark.read.option("uri", mongoConfig.uri).option("collection", MONGODB_RATING_COLLECTION).format("com.mongodb.spark.sql").load().as[Rating].toDF()val movieDF = spark.read.option("uri", mongoConfig.uri).option("collection", MONGODB_MOVIE_COLLECTION).format("com.mongodb.spark.sql").load().as[Movie].toDF()//创建一个临时表ratingDF.createOrReplaceTempView("ratings")//1. 历史热门统计,历史评分数最多val historicalHotMoviesDF = spark.sql("select mid, count(mid) as count " +"from ratings " +"group by mid")storeDFInMongoDB(historicalHotMoviesDF, HISTORICAL_HOT_MOVIES)//2. 近期热门统计,按时间戳// 创建一个日期格式化工具val simpleDateFormat = new SimpleDateFormat("yyyyMM")// 注册UDF,将时间戳转换成年月格式//TODO new Data() 需要 毫秒spark.udf.register("changeDate", (x: Int) => simpleDateFormat.format(new Date(x * 1000L)).toInt)// 预处理去掉uid,转换时间val ratingOfYearMonth = spark.sql("select mid, score, changeDate(timestamp) as yearmonth " +"from ratings")ratingOfYearMonth.createOrReplaceTempView("ratingOfMonth")val recentlyHotMovieDF = spark.sql("select mid, count(mid) as count, yearmonth from ratingOfMonth group by yearmonth, mid order by yearmonth desc, count desc")storeDFInMongoDB(recentlyHotMovieDF, RECENTLY_HOT_MOVIES)//3. 统计电影的平均评分val averageMoviesDF = spark.sql("select mid, avg(score) as avg from ratings group by mid")storeDFInMongoDB(averageMoviesDF, AVERAGE_MOVIES)// 4. 各类别电影top统计// 定义电影所有的类别val genres = List("Action", "Adventure", "Animation", "Comedy", "Crime", "Documentary", "Drama", "Family", "Fantasy", "Foreign", "History", "Horror", "Music", "Mystery", "Romance", "Science", "Tv", "Thriller", "War", "Western")//把平均分加入movie表里,没有评分的不要(用inner join)val movieWithScore = movieDF.join(averageMoviesDF, "mid")/*** 判断电影类别里面是否包含 类别*///电影类别和电影名称做 笛卡尔积并过滤//根据类别聚合val genresRDD = spark.sparkContext.makeRDD(genres)val genresTopMoviesDF = genresRDD.cartesian(movieWithScore.rdd).filter{//判断movie.genres值是否包含当前的类别case (genre, row) => row.getAs[String]("genres").toLowerCase.contains( genre.toLowerCase() )}.map{case (genre, row) => (genre, ( row.getAs[Int]("mid"), row.getAs[Double]("avg")))}.groupByKey().map{case (genres, items) => GenresRecommendation(genres, items.toList.sortWith(_._2 > _._2).take(10).map(item => Recommendation(item._1, item._2)))}.toDFstoreDFInMongoDB(genresTopMoviesDF, GENRES_TOP_MOVIES)spark.stop()}def storeDFInMongoDB(df: DataFrame, collection_name: String)(implicit mongoConfig: MongoConfig): Unit = {df.write.option("uri", mongoConfig.uri).option("collection", collection_name).mode("overwrite").format("com.mongodb.spark.sql").save()}}