【电影推荐系统】统计性算法

news/2025/1/15 13:54:24/

目录

目标

历史热门电影统计

最近热门电影统计

电影平均得分统计

每个类别优质电影统计

完整代码


目标

从 MongoDB 中加载数据,将【电影平均评分统计】、【电影评分个数统计】、【最近电影评分个数统计】三个统计算法进行运行实现,并将计算结果回写到 MongoDB 中。

历史热门电影统计

根据所有历史评分数据,计算历史评分次数最多的电影。

select mid, count(mid) as count from ratings group by mid;
    //1. 历史热门统计,历史评分数最多val historicalHotMoviesDF = spark.sql("select mid, count(mid) as count " +"from ratings " +"group by mid")

最近热门电影统计

根据评分,按月为单位计算最近时间的月份里面评分数最多的电影集合。

自定义时间戳=>年月格式UDF。

spark.udf.register("changeDate",(x:Int)=>simpleDateFormat.format(new Date(x * 1000L)).toInt)

转换时间格式SQL。

select mid, score, changeDate(timestamp) as yearmonth from ratings;
select mid, count(mid) as count ,yearmonth from ratingOfMonth group by yearmonth,mid;
    //2. 近期热门统计,按时间戳//    创建一个日期格式化工具val simpleDateFormat = new SimpleDateFormat("yyyyMM")//    注册UDF,将时间戳转换成年月格式//TODO new Data() 需要 毫秒spark.udf.register("changeDate", (x: Int) => simpleDateFormat.format(new Date(x * 1000L)).toInt)//      预处理去掉uid,转换时间val ratingOfYearMonth = spark.sql("select mid, score, changeDate(timestamp) as yearmonth " +"from ratings")ratingOfYearMonth.createOrReplaceTempView("ratingOfMonth")val recentlyHotMovieDF = spark.sql("select mid, count(mid) as count, yearmonth from ratingOfMonth group by yearmonth, mid order by yearmonth desc, count desc")storeDFInMongoDB(recentlyHotMovieDF, RECENTLY_HOT_MOVIES)

电影平均得分统计

根据历史数据中所有用户对电影的评分,周期性的计算每个电影的平均得分。

select mid, avg(score) as avg from ratings group by mid

 

    val averageMoviesDF = spark.sql("select mid, avg(score) as avg from ratings group by mid")storeDFInMongoDB(averageMoviesDF, AVERAGE_MOVIES)

每个类别优质电影统计

根据提供的所有电影类别,分别计算每种类型的电影集合中评分最高的10 个电影。

在计算完整个电影的平均得分之后,将影片集合与电影类型做笛卡尔积,然后过滤掉电影类型不符合的条目。

核心代码

val genresTopMoviesDF = genresRDD.cartesian(movieWithScore.rdd).filter{case (genre, row) => row.getAs[String]("genres").toLowerCase.contains( genre.toLowerCase() )}.map{case (genre, row) => (genre, ( row.getAs[Int]("mid"), row.getAs[Double]("avg")))}.groupByKey().map{case (genres, items) => GenresRecommendation(genres, items.toList.sortWith(_._2 > _._2).take(10).map(item => Recommendation(item._1, item._2)))}.toDF

完整代码

package com.statistiscimport java.util.Dateimport org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}import java.text.SimpleDateFormat/*** 离线统计推荐* 1. 历史热门* 2. 最近热门* 3. 电影评分* 4. 每个类别电影的评分前十电影统计*/case class Movie(mid: Int, name: String, descri: String, timelong: String, issue: String,shoot: String, language: String, genres: String, actors: String, directors: String)
case class Rating(uid: Int, mid: Int, score: Double, timestamp: Int)case class MongoConfig(uri: String, db: String)//定义电影类别评分top10样例类
//基准推荐对象
case class Recommendation(mid: Int, score: Double)case class GenresRecommendation(genres: String, recs: Seq[Recommendation]) //类别, top10基准推荐对象object statisticsRecommender {val MONGODB_MOVIE_COLLECTION = "Movie"val MONGODB_RATING_COLLECTION = "Rating"//存储表val HISTORICAL_HOT_MOVIES = "HistoricalHotMovies"val RECENTLY_HOT_MOVIES = "RecentlyHotMovies"val AVERAGE_MOVIES = "AverageMovies"val GENRES_TOP_MOVIES = "GenresTopMovies"def main(args: Array[String]): Unit = {val config = Map("spark.cores" -> "local[*]","mongo.uri" -> "mongodb://hadoop100:27017/recommender","mongo.db" -> "recommender")//创建一个sparkConfval sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("StatisticsRecommeder")//创建一个SparkSessionval spark = SparkSession.builder().config(sparkConf).getOrCreate()import spark.implicits._implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))/*** 从mongodb加载数据* 1. 创建连接,标准的连接mongodb* 2. spark快速读取*/val ratingDF = spark.read.option("uri", mongoConfig.uri).option("collection", MONGODB_RATING_COLLECTION).format("com.mongodb.spark.sql").load().as[Rating].toDF()val movieDF = spark.read.option("uri", mongoConfig.uri).option("collection", MONGODB_MOVIE_COLLECTION).format("com.mongodb.spark.sql").load().as[Movie].toDF()//创建一个临时表ratingDF.createOrReplaceTempView("ratings")//1. 历史热门统计,历史评分数最多val historicalHotMoviesDF = spark.sql("select mid, count(mid) as count " +"from ratings " +"group by mid")storeDFInMongoDB(historicalHotMoviesDF, HISTORICAL_HOT_MOVIES)//2. 近期热门统计,按时间戳//    创建一个日期格式化工具val simpleDateFormat = new SimpleDateFormat("yyyyMM")//    注册UDF,将时间戳转换成年月格式//TODO new Data() 需要 毫秒spark.udf.register("changeDate", (x: Int) => simpleDateFormat.format(new Date(x * 1000L)).toInt)//      预处理去掉uid,转换时间val ratingOfYearMonth = spark.sql("select mid, score, changeDate(timestamp) as yearmonth " +"from ratings")ratingOfYearMonth.createOrReplaceTempView("ratingOfMonth")val recentlyHotMovieDF = spark.sql("select mid, count(mid) as count, yearmonth from ratingOfMonth group by yearmonth, mid order by yearmonth desc, count desc")storeDFInMongoDB(recentlyHotMovieDF, RECENTLY_HOT_MOVIES)//3. 统计电影的平均评分val averageMoviesDF = spark.sql("select mid, avg(score) as avg from ratings group by mid")storeDFInMongoDB(averageMoviesDF, AVERAGE_MOVIES)// 4. 各类别电影top统计//    定义电影所有的类别val genres = List("Action", "Adventure", "Animation", "Comedy", "Crime", "Documentary", "Drama", "Family", "Fantasy", "Foreign", "History", "Horror", "Music", "Mystery", "Romance", "Science", "Tv", "Thriller", "War", "Western")//把平均分加入movie表里,没有评分的不要(用inner join)val movieWithScore = movieDF.join(averageMoviesDF, "mid")/*** 判断电影类别里面是否包含 类别*///电影类别和电影名称做 笛卡尔积并过滤//根据类别聚合val genresRDD = spark.sparkContext.makeRDD(genres)val genresTopMoviesDF = genresRDD.cartesian(movieWithScore.rdd).filter{//判断movie.genres值是否包含当前的类别case (genre, row) => row.getAs[String]("genres").toLowerCase.contains( genre.toLowerCase() )}.map{case (genre, row) => (genre, ( row.getAs[Int]("mid"), row.getAs[Double]("avg")))}.groupByKey().map{case (genres, items) => GenresRecommendation(genres, items.toList.sortWith(_._2 > _._2).take(10).map(item => Recommendation(item._1, item._2)))}.toDFstoreDFInMongoDB(genresTopMoviesDF, GENRES_TOP_MOVIES)spark.stop()}def storeDFInMongoDB(df: DataFrame, collection_name: String)(implicit mongoConfig: MongoConfig): Unit = {df.write.option("uri", mongoConfig.uri).option("collection", collection_name).mode("overwrite").format("com.mongodb.spark.sql").save()}}


http://www.ppmy.cn/news/764968.html

相关文章

Python实现OCR大批量识别图片文字,并将文字保存到txt文档中,文末源码直接拿!

文章目录 项目背景项目介绍运行项目代码详解源码地址 项目背景 在当今数字化时代,图像文字识别(Optical Character Recognition, OCR)技术的应用越来越广泛。 OCR技术可以将印刷体文字转化为可编辑的文本格式,从而方便进行文本分…

计算机网络设置端口转发,网件NETGEAR几款路由器端口转发功能设置方法

WPN824, RP614v2,MR814v2,WGR614,WGT624 端口转发设置实例。(以 RP614v2 为例) 1. WPN824, RP614v2,MR814v2,WGR614,WGT624 如何设置端口转发? 先登陆到设备的配置截面 在‘高级选项(Advanced…

服务器怎么设置自动拨号,网件路由器怎么设置自动拨号

网件netgear一直致力于网络技术创新,专注于产品的可靠性和易用性提升,为全球商用企业用户和家庭个人用户提供使用简便的高质量网络解决方案,那么你知道网件路由器怎么设置自动拨号吗?下面是学习啦小编整理的一些关于网件路由器设置自动拨号的…

网件R6400内网穿透最简单的实现方式

我的树莓派和ubuntu server都是放在家里的,刚开始不太懂就购买了hua sheng bang,可惜只有2条映射,由于软件多需要的端口也多,我就又购买了2个,总共6条映射。后来看了B站的视频,知道软路由和购买域名&#x…

网件使用计算机mac地址吗,找回网件R7000消失的无线MAC过滤功能

本帖最后由 zyyjcj 于 2015-2-27 20:13 编辑 想必大家R7000到手后都发现没有无线MAC过滤功能,对于平时不加密或作为AP的人来说很不方便,其实这个功能只是被隐藏了而已。 游览器登陆:http://routerlogin.net/WLG_acl.htm 发现了么:…

网件使用计算机mac地址吗,网件原生固件设置无线MAC地址过滤详细步骤

摘 要 原标题:"网件R6300v2原生固件设置无线MAC地址过滤方法"相关路由器设置经验分享。 - 来源:路由器之家  路由器具有判断网络地址和选择IP路径的功能,它能在多网络互联环境中,建" 原标题:"网件R6300v2原…

网件路由器使用计算机mac,网件路由器怎么ip与mac绑定(2)

网件路由器设置mac地址过滤的方法 1.通过启动电脑的Internet Explorer或者Netscape Navigator等其他浏览器登陆WG302的管理界面: 登录时只需输入192.168.0.228即可,且AP会要求提供用户名和密码。用户名:admin/密码:password,都是小…

网件6250刷Tomato 系统

最近手欠,总是想把路由器的系统给刷了,礼拜在家里测试 路由器:Netgear R6250 升级前,最好把本机的环境设置为:192.168.1.2/255.255.255.0 在网络上上找了许长时间,也测试了多款系统,一点击&…