【电影推荐系统】数据加载

news/2024/12/2 13:03:41/

目录

数据集

解释

movie.csv

ratings.csv

tag.csv

数据预处理

 mongodb

将数据按照csv文件里的分割符进行分割,转换为DF

Moive

Rating

Tag

es

将mongo集合tag 根据mid tag => mid tags(tag1|tag2|tag3...)

 moive 添加一列 tags

导入后数据库信息

mongodb

Movie

Rating

Tag

es

完整代码


数据集

链接:https://pan.baidu.com/s/1OjJ66PsES-qsun7sV_OeJA?pwd=2580 
提取码:2580 

解释

movie.csv

1^Toy Story (1995)^ ^81 minutes^March 20, 2001^1995^English 
^Adventure|Animation|Children|Comedy|Fantasy ^Tom Hanks|Tim Allen|Don 
Rickles|Jim Varney|Wallace Shawn|John Ratzenberger|Annie Potts|John 
Morris|Erik von Detten|Laurie Metcalf|R. Lee Ermey|Sarah Freeman|Penn 
Jillette|Tom Hanks|Tim Allen|Don Rickles|Jim Varney|Wallace Shawn ^John 
Lasseter

ratings.csv

1,31,2.5,1260759144

tag.csv

1,31,action,1260759144

 

数据预处理

 mongodb

将数据按照csv文件里的分割符进行分割,转换为DF

核心代码

Moive

    //加载数据val movieRDD = spark.sparkContext.textFile(MOVIE_DATA_PATH)   //  将数据预处理 字符串分割 => 数组 => 封装 成 Movie类//   1^Toy Story (1995)^ ^81 minutes^March 20, 2001^1995^English ^Adventure|Animation|Children|Comedy|Fantasy ^Tom Hanks|Tim Allen|Don Rickles|Jim Varney|Wallace Shawn|John Ratzenberger|Annie Potts|John Morris|Erik von Detten|Laurie Metcalf|R. Lee Ermey|Sarah Freeman|Penn Jillette|Tom Hanks|Tim Allen|Don Rickles|Jim Varney|Wallace Shawn ^John Lasseterval movieDF = movieRDD.map(item => {//      .split() 里面一般是正则 ^ 的正则 \\^val s = item.split("\\^")//          scala 数组用()//      .split()切割后,一般需要.trimMovie(s(0).toInt, s(1).trim, s(2).trim, s(3).trim, s(4).trim, s(5).trim, s(6).trim, s(7).trim, s(8).trim, s(9).trim)}).toDF()
//    mongodb
//    { "_id" : ObjectId("644b85b62ecfa735d034ce31"), "mid" : 1, "name" : "Toy Story (1995)", "descri" : "", "timelong" : "81 minutes", "issue" : "March 20, 2001", "shoot" : "1995", "language" : "English", "genres" : "Adventure|Animation|Children|Comedy|Fantasy", "actors" : "Tom Hanks|Tim Allen|Don Rickles|Jim Varney|Wallace Shawn|John Ratzenberger|Annie Potts|John Morris|Erik von Detten|Laurie Metcalf|R. Lee Ermey|Sarah Freeman|Penn Jillette|Tom Hanks|Tim Allen|Don Rickles|Jim Varney|Wallace Shawn", "directors" : "John Lasseter" }

Rating

    val ratingRDD = spark.sparkContext.textFile(RATING_DATA_PATH)//    1,31,2.5,1260759144val ratingDF = ratingRDD.map(item => {val s = item.split(",")Rating(s(0).toInt, s(1).toInt, s(2).toDouble, s(3).toInt)}).toDF()
//    mongodb
//{ "_id" : ObjectId("644b85b62ecfa735d034d83f"), "uid" : 1, "mid" : 31, "score" : 2.5, "timestamp" : 1260759144 }

Tag

    //    15,1955,dentist,1193435061val tagRDD = spark.sparkContext.textFile(TAG_DATA_PATH)val tagDF = tagRDD.map(item => {val s = item.split(",")Tag(s(0).toInt, s(1).toInt, s(2).trim, s(3).toInt)}).toDF()
//    mongodb
//    { "_id" : ObjectId("644b85b72ecfa735d035854f"), "uid" : 15, "mid" : 1955, "tag" : "dentist", "timestamp" : 1193435061 }

es

将mongo集合tag 根据mid tag => mid tags(tag1|tag2|tag3...)

形成

mid1 tags(tag1|tag2|tag3...)

mid2 tags(tag1|tag2|tag3...)

mid3 tags(tag1|tag2|tag3...)

mid4 tags(tag1|tag2|tag3...) ...

核心代码

     /* mid tags* tags:  tag1 |tag2|tag3*/val newTag = tagDF.groupBy($"mid")//agg函数经常与groupBy函数一起使用,起到分类聚合的作用;//如果单独使用则对整体进行聚合;.agg(concat_ws("|", collect_set($"tag")).as("tags")).select("mid", "tags")

相当于 hql

select mid, concat_ws("|",collect_set(tag)) as tags
from tag
group by mid;

 moive 添加一列 tags

核心代码

val newMovieDF = movieDF.join(newTag, Seq("mid"), "left")

相当于 Hql

select * 
from movie m
left join
select * 
from newTag t
where m.mid = t.mid;

导入后数据库信息

mongodb

sudo ./bin/mongod  -config ./data/mongodb.conf
show dbs
use 数据库名字
show collections
db.集合名字.find()

Movie

 { "_id" : ObjectId("644b85b62ecfa735d034ce31"), "mid" : 1, "name" : "Toy Story (1995)", "descri" : "", "timelong" : "81 minutes", "issue" : "March 20, 2001", "shoot" : "1995", "language" : "English", "genres" : "Adventure|Animation|Children|Comedy|Fantasy", "actors" : "Tom Hanks|Tim Allen|Don Rickles|Jim Varney|Wallace Shawn|John Ratzenberger|Annie Potts|John Morris|Erik von Detten|Laurie Metcalf|R. Lee Ermey|Sarah Freeman|Penn Jillette|Tom Hanks|Tim Allen|Don Rickles|Jim Varney|Wallace Shawn", "directors" : "John Lasseter" }

Rating

{ "_id" : ObjectId("644b85b62ecfa735d034d83f"), "uid" : 1, "mid" : 31, "score" : 2.5, "timestamp" : 1260759144 }

 

Tag

 { "_id" : ObjectId("644b85b72ecfa735d035854f"), "uid" : 15, "mid" : 1955, "tag" : "dentist", "timestamp" : 1193435061 }

es

./bin/elasticsearch -d
curl http://hadoop100:9200/

 

完整代码

package com.qh.recommenderimport com.mongodb.casbah.commons.MongoDBObject
import com.mongodb.casbah.{MongoClient, MongoClientURI}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.transport.InetSocketTransportAddress
import org.elasticsearch.transport.client.PreBuiltTransportClientimport java.net.InetAddress/*** 将数据封装成样例类*/
case class Movie(mid: Int, name: String, descri: String, timelong: String, issue: String,shoot: String, language: String, genres: String, actors: String, directors: String)case class Rating(uid: Int, mid: Int, score: Double, timestamp: Int)case class Tag(uid: Int, mid: Int, tag: String, timestamp: Int)/*** 把mongodb, es的配置封装3成样例类*//**** @param uri MongoDB连接* @param db  MongoDB数据库*/
case class MongoConfig(uri: String, db: String)/**** @param httpHosts      http主机列表,逗号分割 9200* @param transportHosts transport主机列表,集群内部传输用的端口号* @param index          需要操作的索引* @param clustername    集群名称,默认配置名*                       启动es  cluter_name*                       Movie/_search*/
case class ESConfig(httpHosts: String, transportHosts: String, index: String, clustername: String)object DataLoader {val MOVIE_DATA_PATH = "E:\\project\\BigData\\MovieSystem\\recommender\\DataLoader\\src\\main\\resources\\movies.csv"val RATING_DATA_PATH = "E:\\project\\BigData\\MovieSystem\\recommender\\DataLoader\\src\\main\\resources\\ratings.csv"val TAG_DATA_PATH = "E:\\project\\BigData\\MovieSystem\\recommender\\DataLoader\\src\\main\\resources\\tags.csv"//  定义表名参数val MONGODB_MOVIE_COLLECTION = "Movie"val MONGODB_RATING_COLLECTION = "Rating"val MONGODB_TAG_COLLECTION = "Tag"val ES_MOVIE_INDEX = "Movie"def main(args: Array[String]): Unit = {val config = Map("spark.cores" -> "local[*]","mongo.uri" -> "mongodb://hadoop100:27017/recommender","mongo.db" -> "recommender","es.httpHosts" -> "hadoop100:9200","es.transportHosts" -> "hadoop100:9300","es.index" -> "recommender","es.cluster.name" -> "es-cluster")//创建一个sparkConfval sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("DataLoader")//创建一个SparkSession//    builder中有一个getOrCreate方法,它是获取一个已经存在的会话,或者没有的情况下创建一个新的会话。//    https://zhuanlan.zhihu.com/p/343668901val spark = SparkSession.builder().config(sparkConf).getOrCreate()//    rdd=> df ds//    https://wenku.csdn.net/answer/6b3d109ee8d01601ccd9ac1944772477import spark.implicits._//加载数据val movieRDD = spark.sparkContext.textFile(MOVIE_DATA_PATH)//  将数据预处理 字符串分割 => 数组 => 封装 成 Movie类//   1^Toy Story (1995)^ ^81 minutes^March 20, 2001^1995^English ^Adventure|Animation|Children|Comedy|Fantasy ^Tom Hanks|Tim Allen|Don Rickles|Jim Varney|Wallace Shawn|John Ratzenberger|Annie Potts|John Morris|Erik von Detten|Laurie Metcalf|R. Lee Ermey|Sarah Freeman|Penn Jillette|Tom Hanks|Tim Allen|Don Rickles|Jim Varney|Wallace Shawn ^John Lasseterval movieDF = movieRDD.map(item => {//      .split() 里面一般是正则 ^ 的正则 \\^val s = item.split("\\^")//          scala 数组用()//      .split()切割后,一般需要.trimMovie(s(0).toInt, s(1).trim, s(2).trim, s(3).trim, s(4).trim, s(5).trim, s(6).trim, s(7).trim, s(8).trim, s(9).trim)}).toDF()//    mongodb//    { "_id" : ObjectId("644b85b62ecfa735d034ce31"), "mid" : 1, "name" : "Toy Story (1995)", "descri" : "", "timelong" : "81 minutes", "issue" : "March 20, 2001", "shoot" : "1995", "language" : "English", "genres" : "Adventure|Animation|Children|Comedy|Fantasy", "actors" : "Tom Hanks|Tim Allen|Don Rickles|Jim Varney|Wallace Shawn|John Ratzenberger|Annie Potts|John Morris|Erik von Detten|Laurie Metcalf|R. Lee Ermey|Sarah Freeman|Penn Jillette|Tom Hanks|Tim Allen|Don Rickles|Jim Varney|Wallace Shawn", "directors" : "John Lasseter" }val ratingRDD = spark.sparkContext.textFile(RATING_DATA_PATH)//    1,31,2.5,1260759144val ratingDF = ratingRDD.map(item => {val s = item.split(",")Rating(s(0).toInt, s(1).toInt, s(2).toDouble, s(3).toInt)}).toDF()//    mongodb//{ "_id" : ObjectId("644b85b62ecfa735d034d83f"), "uid" : 1, "mid" : 31, "score" : 2.5, "timestamp" : 1260759144 }//    15,1955,dentist,1193435061val tagRDD = spark.sparkContext.textFile(TAG_DATA_PATH)val tagDF = tagRDD.map(item => {val s = item.split(",")Tag(s(0).toInt, s(1).toInt, s(2).trim, s(3).toInt)}).toDF()//    mongodb//    { "_id" : ObjectId("644b85b72ecfa735d035854f"), "uid" : 15, "mid" : 1955, "tag" : "dentist", "timestamp" : 1193435061 }//对于配置这样的,每次都需要传并且不变的的参数,可以 隐式定义implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))//保存到mongoudbstoreDataInMongoDB(movieDF, ratingDF, tagDF)//数据预处理, 把movie对应的tag信息加入,添加一列 tag1|tag2|... string//用sparksql//    org.apache.spark.sql.functions是一个Object,提供了约两百多个函数。//大部分函数与Hive的差不多。//除UDF函数,均可在spark-sql中直接使用。//经过import org.apache.spark.sql.functions._ ,也可以用于Dataframe,Dataset。//大部分支持Column的函数也支持String类型的列名。这些函数的返回类型基本都是Column。//https://blog.csdn.net/qq_33887096/article/details/114532707//    简单来说,如果用sparksql里面的函数,就要导入给类import org.apache.spark.sql.functions._//    groupBy(cols $"列名" )//    scala 中 列名 可以用 $"列名"/*** 用Hive里函数* 在tagDF将相同的mid的tag用|连接成tags列* movie 对应的tag信息 用|链接 tag1 |tag2|tag3* mid tags* tags:  tag1 |tag2|tag3*/// select mid, concat_ws("|",collect_set(tag)) as tags//from tag//group by mid;val newTag = tagDF.groupBy($"mid")//agg函数经常与groupBy函数一起使用,起到分类聚合的作用;//如果单独使用则对整体进行聚合;.agg(concat_ws("|", collect_set($"tag")).as("tags")).select("mid", "tags")/*** 将newTag和movie 左外连接* join 默认内链接*///    select *//      from movie m//    left join//      select *//        from newTag t//    where m//    .mid = t.mid;val newMovieDF = movieDF.join(newTag, Seq("mid"), "left")implicit val esConfig = ESConfig(config("es.httpHosts"), config("es.transportHosts"), config("es.index"), config("es.cluster.name"))//保存到esstoreDataInEs(newMovieDF)spark.stop()}def storeDataInMongoDB(movieDF: DataFrame, ratingDF: DataFrame, tagDF: DataFrame)(implicit mongoConfig: MongoConfig): Unit = {// 新建一个mongodb的连接val mongoClient = MongoClient(MongoClientURI(mongoConfig.uri))// 如果mongodb中已经有相应的数据库,先删除mongoClient(mongoConfig.db)(MONGODB_MOVIE_COLLECTION).dropCollection()mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION).dropCollection()mongoClient(mongoConfig.db)(MONGODB_TAG_COLLECTION).dropCollection()// 将DF数据写入对应的mongodb表中movieDF.write.option("uri", mongoConfig.uri).option("collection", MONGODB_MOVIE_COLLECTION).mode("overwrite").format("com.mongodb.spark.sql").save()ratingDF.write.option("uri", mongoConfig.uri).option("collection", MONGODB_RATING_COLLECTION).mode("overwrite").format("com.mongodb.spark.sql").save()tagDF.write.option("uri", mongoConfig.uri).option("collection", MONGODB_TAG_COLLECTION).mode("overwrite").format("com.mongodb.spark.sql").save()//对数据表建索引mongoClient(mongoConfig.db)(MONGODB_MOVIE_COLLECTION).createIndex(MongoDBObject("mid" -> 1))mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION).createIndex(MongoDBObject("uid" -> 1))mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION).createIndex(MongoDBObject("mid" -> 1))mongoClient(mongoConfig.db)(MONGODB_TAG_COLLECTION).createIndex(MongoDBObject("uid" -> 1))mongoClient(mongoConfig.db)(MONGODB_TAG_COLLECTION).createIndex(MongoDBObject("mid" -> 1))mongoClient.close()}def storeDataInEs(movieDF: DataFrame)(implicit esConfig: ESConfig): Unit = {// 新建es配置val settings: Settings = Settings.builder().put("cluster.name", esConfig.clustername).build()// 新建一个es客户端val esClient = new PreBuiltTransportClient(settings)//用正则进行集群主机名分割val REGEX_HOST_PORT = "(.+):(\\d+)".r//    添加集群主机和端口esConfig.transportHosts.split(",").foreach {case REGEX_HOST_PORT(host: String, port: String) => {//        es里面的类InetSocketTransportAddress 需要 java的InetAddress类esClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port.toInt))}}//    清洗遗留数据if (esClient.admin().indices().exists(new IndicesExistsRequest(esConfig.index)).actionGet().isExists) {esClient.admin().indices().delete(new DeleteIndexRequest(esConfig.index))}//创建 库esClient.admin().indices().create(new CreateIndexRequest(esConfig.index))//    写入数据movieDF.write.option("es.nodes", esConfig.httpHosts).option("es.http.timeout", "100m").option("es.mapping.id", "mid").mode("overwrite").format("org.elasticsearch.spark.sql").save(esConfig.index + "/" + ES_MOVIE_INDEX)}
}

来源:尚硅谷

(9条消息) MongoDB 查看集合中所有的数据_mongodb查询集合所有数据_Tw_light的博客-CSDN博客


http://www.ppmy.cn/news/728565.html

相关文章

Energy-efficient Offloading for Mobile Edge Computing in 5G Heterogeneous Networks----边缘计算译文part I

摘要 移动边缘计算(MEC)是第五代(5G)网络中向移动设备提供近距离云计算能力的一个很有前途的范例。本文研究了5G异构网络中MEC的节能计算卸载机制。在考虑任务计算和文件传输能量消耗的情况下,提出了卸载系统能量消耗最小化的优化问题。结合5G异构网络的多址特性&a…

Linux7删除lv,LV扩展,LVM卸载

LVM的容量调整可以在多个环节进行调整,比如:可以在物理卷上,VG上,以及LV上,都可以进行容量的扩展,这也是LVM它的一个优势所在。 1:添加新的PV [root@rhel5 ~]# fdisk -l /dev/sdb #可以看见已经新添加了一个分区/dev/sdb7 Disk /dev/sdb: 10.7 GB, 10737418240 bytes 25…

Windows Server 2008 R2下安装卸载Oracle 11g

一、安装前的准备工作 修改计算机名: 服务器的计算机名称对于登录到Oracle数据库非常重要!切记!!! (1) 安装好Oracle数据库后,再修改计算机名称,可能会无法启动服务,即不能在浏览器中使用OEM(Oracle Enterprise Manager); (2) Or…

ubuntu安装和卸载Beyond Compare

1.卸载之前的安装 sudo apt-get remove bcompare 2. 安装gdebi   sudo apt-get install gdebi   3.下载bcompare版本 官网下载http://www.scootersoftware.com/download.php ubuntu选择Linux下的Debian,32位还是64位根据自己的系统下载 4.安装安装bcompare  安装…

DPU应用场景(网络功能卸载)

网络功能卸载是伴随云计算网络而产生的,主要是对云计算主机上的虚拟交换机的能力做硬件卸载,从而减少主机上消耗在网络上的CPU算力,提高可售卖计算资源。 图 云计算网络架构 目前除了公有云大厂采用自研云平台,绝大部分私有云厂商…

Linux 操作系统原理 — Basic NIC、SmartNIC、DPU 设备演进与运行原理

目录 文章目录 目录Basic Ethernet NIC 设备组成Physical Interface(物理链路连接器)PHY(物理层调制解调器)MAC(介质访问控制器)Ethernet Controller(以太网控制器)DMA Controller&a…

5G及移动边缘计算(MEC)学习笔记(2)

1、MEC的优势 MEC 运行于网络边缘,逻辑上并不依赖于网络的其他部分,这点对于安全性要求较高的应用来说非常重要。另外,MEC 服务器通常具有较高的计算能力,因此特别适合于分析处理大量数据。同时,由于 MEC 距离用…

【文献翻译】用于5G蜂窝的毫米波移动通信:我看行!

【标题】:Millimeter Wave Mobile Communications for 5G Cellular: It Will Work! 【作者】:T. S. Rappaport, S. Sun, R. Mayzus, H. Zhao, Y. Azar, K. Wang, G. N. Wong, J. K. Schulz, M. Samimi, and F. Gutierrez 【来源】:IEEE Acces…