import os
配置spark driver和pyspark运行时,所使用的python解释器路径
由于miniconda3中默认存在一个python3.7的版本,jupyter默认也使用的是这个版本,故:设置pyspark的解释器为miniconda3的解释器
PYSPARK_PYTHON = “/root/miniconda3/bin/python3”
JAVA_HOME=’/root/bigdata/jdk1.8.0_181’
当存在多个版本时,不指定很可能会导致出错
os.environ[“PYSPARK_PYTHON”] = PYSPARK_PYTHON
os.environ[“PYSPARK_DRIVER_PYTHON”] = PYSPARK_PYTHON
os.environ[‘JAVA_HOME’]=JAVA_HOME
spark配置信息
from pyspark import SparkConf
from pyspark.sql import SparkSession
SPARK_APP_NAME = “preprocessingBehaviorLog”
SPARK_URL = “spark://192.168.199.126:7077”
conf = SparkConf() # 创建spark config对象
config = (
(“spark.app.name”, SPARK_APP_NAME), # 设置启动的spark的app名称,没有提供,将随机产生一个名称
(“spark.executor.memory”, “2g”), # 设置该app启动时占用的内存用量,默认1g
(“spark.master”, SPARK_URL), # spark master的地址
(“spark.executor.cores”, “2”), # 设置spark executor使用的CPU核心数
# 以下三项配置,可以控制执行器数量
(“spark.dynamicAllocation.enabled”, True),
(“spark.dynamicAllocation.initialExecutors”, 1), # 1个执行器
(“spark.shuffle.service.enabled”, True)
(‘spark.sql.pivotMaxValues’, ‘99999’), # 当需要pivot DF,且值很多时,需要修改,默认是10000
)
查看更详细配置及说明:https://spark.apache.org/docs/latest/configuration.html
conf.setAll(config)
利用config对象,创建spark session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
从hdfs中加载csv文件为DataFrame
从hdfs加载CSV文件为DataFrame
df = spark.read.csv(“file:///root/jupyter_code/behavior_log_less.csv”, header=True)
df.show() # 查看dataframe,默认显示前20条
大致查看一下数据类型
df.printSchema() # 打印当前dataframe的结构
从hdfs加载数据为dataframe,并设置结构
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType
构建结构对象
schema = StructType([
StructField(“userId”, IntegerType()),
StructField(“timestamp”, LongType()),
StructField(“btag”, StringType()),
StructField(“cateId”, IntegerType()),
StructField(“brandId”, IntegerType())
])
从hdfs加载数据为dataframe,并设置结构
behavior_log_df = spark.read.csv(“file:///root/jupyter_code/behavior_log_less.csv”, header=True, schema=schema)
behavior_log_df.show()
behavior_log_df.count()
分析数据集字段的类型和格式
查看是否有空值
查看每列数据的类型
查看每列数据的类别情况
print(“查看userId的数据情况:”, behavior_log_df.groupBy(“userId”).count().count())
约113w用户
#注意:behavior_log_df.groupBy(“userId”).count() 返回的是一个dataframe,这里的count计算的是每一个分组的个数,但当前还没有进行计算
当调用df.count()时才开始进行计算,这里的count计算的是dataframe的条目数,也就是共有多少个分组
print(“查看btag的数据情况:”, behavior_log_df.groupBy(“btag”).count().collect()) # collect会把计算结果全部加载到内存,谨慎使用
只有四种类型数据:pv、fav、cart、buy
这里由于类型只有四个,所以直接使用collect,把数据全部加载出来
pivot透视操作,把某列里的字段值转换成行并进行聚合运算(pyspark.sql.GroupedData.pivot)
如果透视的字段中的不同属性值超过10000个,则需要设置spark.sql.pivotMaxValues,否则计算过程中会出现错误。文档介绍。
统计每个用户对各类商品的pv、fav、cart、buy数量
cate_count_df = behavior_log_df.groupBy(behavior_log_df.userId, behavior_log_df.cateId).pivot(“btag”,[“pv”,“fav”,“cart”,“buy”]).count()
cate_count_df.printSchema() # 此时还没有开始计算
统计每个用户对各个品牌的pv、fav、cart、buy数量并保存结果
统计每个用户对各个品牌的pv、fav、cart、buy数量
brand_count_df = behavior_log_df.groupBy(behavior_log_df.userId, behavior_log_df.brandId).pivot(“btag”,[“pv”,“fav”,“cart”,“buy”]).count()
brand_count_df.show() # 同上
113w * 46w
由于运算时间比较长,所以这里先将结果存储起来,供后续其他操作使用
写入数据时才开始计算
cate_count_df.write.csv(“file:///root/jupyter_code1/middle_result/preprocessing_dataset/cate_count.csv”, header=True)
brand_count_df.write.csv("file:///root/jupyter_code1/middle_result/preprocessing_dataset/brand_cou
7.2 根据用户对类目偏好打分训练ALS模型
根据您统计的次数 + 打分规则 ==> 偏好打分数据集 ==> ALS模型
spark ml的模型训练是基于内存的,如果数据过大,内存空间小,迭代次数过多的话,可能会造成内存溢出,报错
设置Checkpoint的话,会把所有数据落盘,这样如果异常退出,下次重启后,可以接着上次的训练节点继续运行
但该方法其实指标不治本,因为无法防止内存溢出,所以还是会报错
如果数据量大,应考虑的是增加内存、或限制迭代次数和训练数据量级等
spark.sparkContext.setCheckpointDir(“hdfs://node-teach1:8020/checkPoint/”)
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, FloatType
构建结构对象
schema = StructType([
StructField(“userId”, IntegerType()),
StructField(“cateId”, IntegerType()),
StructField(“pv”, IntegerType()),
StructField(“fav”, IntegerType()),
StructField(“cart”, IntegerType()),
StructField(“buy”, IntegerType())
])
从hdfs加载CSV文件
cate_count_df = spark.read.csv(“file:///root/jupyter_code1/middle_result/preprocessing_dataset/cate_count.csv”, header=True, schema=schema)
cate_count_df.printSchema()
cate_count_df.first() # 第一行数据
处理每一行数据:r表示row对象
def process_row®:
# 处理每一行数据:r表示row对象
# 偏好评分规则:
# m: 用户对应的行为次数
# 该偏好权重比例,次数上限仅供参考,具体数值应根据产品业务场景权衡
# pv: if m<=20: score=0.2*m; else score=4
# fav: if m<=20: score=0.4*m; else score=8
# cart: if m<=20: score=0.6*m; else score=12
# buy: if m<=20: score=1*m; else score=20# 注意这里要全部设为浮点数,spark运算时对类型比较敏感,要保持数据类型都一致
pv_count = r.pv if r.pv else 0.0
fav_count = r.fav if r.fav else 0.0
cart_count = r.cart if r.cart else 0.0
buy_count = r.buy if r.buy else 0.0pv_score = 0.2*pv_count if pv_count<=20 else 4.0
fav_score = 0.4*fav_count if fav_count<=20 else 8.0
cart_score = 0.6*cart_count if cart_count<=20 else 12.0
buy_score = 1.0*buy_count if buy_count<=20 else 20.0rating = pv_score + fav_score + cart_score + buy_score
# 返回用户ID、分类ID、用户对分类的偏好打分
return r.userId, r.cateId, rating
返回一个PythonRDD类型
返回一个PythonRDD类型,此时还没开始计算
cate_count_df.rdd.map(process_row).toDF([“userId”, “cateId”, “rating”])
用户对商品类别的打分数据
用户对商品类别的打分数据
map返回的结果是rdd类型,需要调用toDF方法转换为Dataframe
cate_rating_df = cate_count_df.rdd.map(process_row).toDF([“userId”, “cateId”, “rating”])
注意:toDF不是每个rdd都有的方法,仅局限于此处的rdd
可通过该方法获得 user-cate-matrix
但由于cateId字段过多,这里运算量比很大,机器内存要求很高才能执行,否则无法完成任务
请谨慎使用
但好在我们训练ALS模型时,不需要转换为user-cate-matrix,所以这里可以不用运行
cate_rating_df.groupBy(“userId”).povit(“cateId”).min(“rating”)
用户对类别的偏好打分数据
cate_rating_df
使用pyspark中的ALS矩阵分解方法实现CF评分预测
文档地址:https://spark.apache.org/docs/2.2.2/api/python/pyspark.ml.html?highlight=vectors#module-pyspark.ml.recommendation
from pyspark.ml.recommendation import ALS # ml:dataframe, mllib:rdd
利用打分数据,训练ALS模型
checkpointInterval:每迭代n次,就会做一次cache,为了处理内存不足造成的问题。
als = ALS(userCol=‘userId’, itemCol=‘cateId’, ratingCol=‘rating’, checkpointInterval=2)
此处训练时间较长
model = als.fit(cate_rating_df)
模型训练好后,调用方法进行使用,具体API查看
model.recommendForAllUsers(N) 给所有用户推荐TOP-N个物品
ret = model.recommendForAllUsers(3)
由于是给所有用户进行推荐,此处运算时间也较长
ret.show()
推荐结果存放在recommendations列中,
ret.select(“recommendations”).show()
model.recommendForUserSubset 给部分用户推荐TOP-N个物品
注意:recommendForUserSubset API,2.2.2版本中无法使用
dataset = spark.createDataFrame([[1],[2],[3]])
dataset = dataset.withColumnRenamed("_1", “userId”)
ret = model.recommendForUserSubset(dataset, 3)
只给部分用推荐,运算时间短
ret.show()
ret.collect() # 注意: collect会将所有数据加载到内存,慎用
transform中提供userId和cateId可以对打分进行预测,利用打分结果排序后
将模型进行存储
model.save(“file:///root/jupyter_code1/tmp_models/userCateRatingALSModel.obj”)
测试存储的模型
from pyspark.ml.recommendation import ALSModel
从hdfs加载之前存储的模型
als_model = ALSModel.load(“file:///root/jupyter_code1/tmp_models/userCateRatingALSModel.obj”)
model.recommendForAllUsers(N) 给用户推荐TOP-N个物品
result = als_model.recommendForAllUsers(3)
result.show()