五、 离线推荐数据缓存

news/2024/11/16 7:42:47/

五 离线推荐数据缓存

5.1离线数据缓存之离线召回集

  • 这里主要是利用我们前面训练的ALS模型进行协同过滤召回,但是注意,我们ALS模型召回的是用户最感兴趣的类别,而我们需要的是用户可能感兴趣的广告的集合,因此我们还需要根据召回的类别匹配出对应的广告。

    所以这里我们除了需要我们训练的ALS模型以外,还需要有一个广告和类别的对应关系。

# 从HDFS中加载广告基本信息数据,返回spark dafaframe对象
df = spark.read.csv("hdfs://localhost:8020/csv/ad_feature.csv", header=True)# 注意:由于本数据集中存在NULL字样的数据,无法直接设置schema,只能先将NULL类型的数据处理掉,然后进行类型转换from pyspark.sql.types import StructType, StructField, IntegerType, FloatType# 替换掉NULL字符串,替换掉
df = df.replace("NULL", "-1")# 更改df表结构:更改列类型和列名称
ad_feature_df = df.\withColumn("adgroup_id", df.adgroup_id.cast(IntegerType())).withColumnRenamed("adgroup_id", "adgroupId").\withColumn("cate_id", df.cate_id.cast(IntegerType())).withColumnRenamed("cate_id", "cateId").\withColumn("campaign_id", df.campaign_id.cast(IntegerType())).withColumnRenamed("campaign_id", "campaignId").\withColumn("customer", df.customer.cast(IntegerType())).withColumnRenamed("customer", "customerId").\withColumn("brand", df.brand.cast(IntegerType())).withColumnRenamed("brand", "brandId").\withColumn("price", df.price.cast(FloatType()))# 这里我们只需要adgroupId、和cateId
_ = ad_feature_df.select("adgroupId", "cateId")
# 由于这里数据集其实很少,所以我们再直接转成Pandas dataframe来处理,把数据载入内存
pdf = _.toPandas()# 手动释放一些内存
del df
del ad_feature_df
del _
import gc
gc.collect()
  • 根据指定的类别找到对应的广告
import numpy as np
pdf.where(pdf.cateId==11156).dropna().adgroupIdnp.random.choice(pdf.where(pdf.cateId==11156).dropna().adgroupId.astype(np.int64), 200)

显示结果:

313       138953.0
314       467512.0
1661      140008.0
1666      238772.0
1669      237471.0
1670      238761.0...   
843456    352273.0
846728    818681.0
846729    838953.0
846810    845337.0
Name: adgroupId, Length: 731, dtype: float64
  • 利用ALS模型进行类别的召回
# 加载als模型,注意必须先有spark上下文管理器,即sparkContext,但这里sparkSession创建后,自动创建了sparkContextfrom pyspark.ml.recommendation import ALSModel
# 从hdfs加载之前存储的模型
als_model = ALSModel.load("hdfs://localhost:8020/models/userCateRatingALSModel.obj")
# 返回模型中关于用户的所有属性   df:   id   features
als_model.userFactors

显示结果:

DataFrame[id: int, features: array<float>]
import pandas as pd
cateId_df = pd.DataFrame(pdf.cateId.unique(),columns=["cateId"])
cateId_df

显示结果:

	cateId
0	1
1	2
2	3
3	4
4	5
5	6
6	7
...	...
6766	12948
6767	12955
6768	12960
6769 rows × 1 columns
cateId_df.insert(0, "userId", np.array([8 for i in range(6769)]))
cateId_df

显示结果:

 userId cateId
0	8	1
1	8	2
2	8	3
3	8	4
4	8	5
...	...	...
6766	8	12948
6767	8	12955
6768	8	12960
6769 rows × 2 columns
  • 传入 userid、cataId的df,对应预测值进行排序
als_model.transform(spark.createDataFrame(cateId_df)).sort("prediction", ascending=False).na.drop().show()

显示结果:

+------+------+----------+
|userId|cateId|prediction|
+------+------+----------+
|     8|  7214|  9.917084|
|     8|   877|  7.479664|
|     8|  7266| 7.4762917|
|     8| 10856| 7.3395424|
|     8|  4766|  7.149538|
|     8|  7282| 6.6835284|
|     8|  7270| 6.2145095|
|     8|   201| 6.0623236|
|     8|  4267| 5.9155636|
|     8|  7267|  5.838009|
|     8|  5392| 5.6882005|
|     8|  6261| 5.6804466|
|     8|  6306| 5.2992325|
|     8| 11050|  5.245261|
|     8|  8655| 5.1701374|
|     8|  4610|  5.139578|
|     8|   932|   5.12694|
|     8| 12276| 5.0776596|
|     8|  8071|  4.979195|
|     8|  6580| 4.8523283|
+------+------+----------+
only showing top 20 rows
import numpy as np
import pandas as pdimport redis# 存储用户召回,使用redis第9号数据库,类型:sets类型
client = redis.StrictRedis(host="192.168.199.188", port=6379, db=9)for r in als_model.userFactors.select("id").collect():userId = r.idcateId_df = pd.DataFrame(pdf.cateId.unique(),columns=["cateId"])cateId_df.insert(0, "userId", np.array([userId for i in range(6769)]))ret = set()# 利用模型,传入datasets(userId, cateId),这里控制了userId一样,所以相当于是在求某用户对所有分类的兴趣程度cateId_list = als_model.transform(spark.createDataFrame(cateId_df)).sort("prediction", ascending=False).na.drop()# 从前20个分类中选出500个进行召回for i in cateId_list.head(20):need = 500 - len(ret)    # 如果不足500个,那么随机选出need个广告ret = ret.union(np.random.choice(pdf.where(pdf.cateId==i.cateId).adgroupId.dropna().astype(np.int64), need))if len(ret) >= 500:    # 如果达到500个则退出breakclient.sadd(userId, *ret)# 如果redis所在机器,内存不足,会抛出异常

5.2 离线数据缓存之离线特征

# "pid", 广告资源位,属于场景特征,也就是说,每一种广告通常是可以防止在多种资源外下的
# 因此这里对于pid,应该是由广告系统发起推荐请求时,向推荐系统明确要推荐的用户是谁,以及对应的资源位,或者说有哪些
# 这样如果有多个资源位,那么每个资源位都会对应相应的一个推荐列表# 需要进行缓存的特征值feature_cols_from_ad = ["price"    # 来自广告基本信息中
]# 用户特征
feature_cols_from_user = ["cms_group_id","final_gender_code","age_level","shopping_level","occupation","pvalue_level","new_user_class_level"
]
  • 从HDFS中加载广告基本信息数据
_ad_feature_df = spark.read.csv("hdfs://localhost:9000/datasets/ad_feature.csv", header=True)# 更改表结构,转换为对应的数据类型
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType# 替换掉NULL字符串
_ad_feature_df = _ad_feature_df.replace("NULL", "-1")# 更改df表结构:更改列类型和列名称
ad_feature_df = _ad_feature_df.\withColumn("adgroup_id", _ad_feature_df.adgroup_id.cast(IntegerType())).withColumnRenamed("adgroup_id", "adgroupId").\withColumn("cate_id", _ad_feature_df.cate_id.cast(IntegerType())).withColumnRenamed("cate_id", "cateId").\withColumn("campaign_id", _ad_feature_df.campaign_id.cast(IntegerType())).withColumnRenamed("campaign_id", "campaignId").\withColumn("customer", _ad_feature_df.customer.cast(IntegerType())).withColumnRenamed("customer", "customerId").\withColumn("brand", _ad_feature_df.brand.cast(IntegerType())).withColumnRenamed("brand", "brandId").\withColumn("price", _ad_feature_df.price.cast(FloatType()))def foreachPartition(partition):import redisimport jsonclient = redis.StrictRedis(host="192.168.199.188", port=6379, db=10)for r in partition:data = {"price": r.price}# 转成json字符串再保存,能保证数据再次倒出来时,能有效的转换成python类型client.hset("ad_features", r.adgroupId, json.dumps(data))ad_feature_df.foreachPartition(foreachPartition)
  • 从HDFS加载用户基本信息数据
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, FloatType# 构建表结构schema对象
schema = StructType([StructField("userId", IntegerType()),StructField("cms_segid", IntegerType()),StructField("cms_group_id", IntegerType()),StructField("final_gender_code", IntegerType()),StructField("age_level", IntegerType()),StructField("pvalue_level", IntegerType()),StructField("shopping_level", IntegerType()),StructField("occupation", IntegerType()),StructField("new_user_class_level", IntegerType())
])
# 利用schema从hdfs加载
user_profile_df = spark.read.csv("hdfs://localhost:8020/csv/user_profile.csv", header=True, schema=schema)
user_profile_df

显示结果:

DataFrame[userId: int, cms_segid: int, cms_group_id: int, final_gender_code: int, age_level: int, pvalue_level: int, shopping_level: int, occupation: int, new_user_class_level: int]
def foreachPartition2(partition):import redisimport jsonclient = redis.StrictRedis(host="192.168.199.8", port=6379, db=10)for r in partition:data = {"cms_group_id": r.cms_group_id,"final_gender_code": r.final_gender_code,"age_level": r.age_level,"shopping_level": r.shopping_level,"occupation": r.occupation,"pvalue_level": r.pvalue_level,"new_user_class_level": r.new_user_class_level}# 转成json字符串再保存,能保证数据再次倒出来时,能有效的转换成python类型client.hset("user_features", r.userId, json.dumps(data))user_profile_df.foreachPartition(foreachPartition2)

http://www.ppmy.cn/news/1028090.html

相关文章

MFC 多语言对话框

可以直接看一下bilibili的这个本人录制的视频&#xff1a;MFC资源多语言_哔哩哔哩_bilibili 这里所说的多语言也是国际化 新建一个MFC项目&#xff0c;我这边是中文简体&#xff0c;如果想加入其他语言&#xff0c;方法如下&#xff1a; 修改完这些之后&#xff0c;需要在代码…

左值引用和右值引用

辨析引用和指针 代码段 // 定义引用变量的方法// 首先&#xff0c;定义一个指针变量 int a 1; int * p &a; // 然后&#xff0c;将&符号移动到*符号的位置&#xff0c;覆盖*符号 int a 1; int & p a; // int * p &a; > int & p a; // 按照这种…

一篇文章带你了解Java发送邮件:使用JavaMail API发送电子邮件的注意事项、发送附件等

Java发送邮件&#xff1a;使用JavaMail API发送电子邮件 作者&#xff1a;Stevedash 发表于&#xff1a;2023年8月13日 15点48分 来源&#xff1a;Java 发送邮件 | 菜鸟教程 (runoob.com) 电子邮件在现代通信中扮演着至关重要的角色&#xff0c;而在Java编程中&#xff0c;…

Cenos7 搭建Minio集群部署服务器(一)

------> 道 | 法 | 术 | 器 | 势 <------ 多台服务器间免密登录|免密拷贝 Cenos7 搭建Minio集群部署服务器(一) 企业级开源对象存储(看看官网吹的牛B) 开源为云提供动力。开源为企业提供动力。开源为 MinIO 提供支持。每天都有成千上万的客户和社区成员信任 Mi…

excel表格处理报空指针异常问题

1.问题描述 1.1工具 使用的excel工具是hutool-5.1.0-jar&#xff0c;调用的方法是 ExcelUtil.getBigWriter("文件路径") 代码在本地执行的时候一切正常&#xff0c;表格也能正常生成&#xff0c;但是放到docker内部署的时候总是报空指针异常&#xff0c;就是运行…

airflow是什么

Airflow 简介 Airflow是一个基于有向无环图(DAG)的可编程、调度和监控的工作流平台&#xff0c;它可以定义一组有依赖的任务&#xff0c;按照依赖依次执行。airflow提供了丰富的命令行工具用于系统管控&#xff0c;而其web管理界面同样也可以方便的管控调度任务&#xff0c;并…

Vue生命周期函数 详解

以下是Vue生命周期函数的流程图和每个周期的代码详解&#xff1a; 流程图&#xff1a; beforeCreate -> created -> beforeMount -> mounted -> beforeUpdate -> updated -> beforeDestroy -> destroyed详解&#xff1a; beforeCreate&#xff1a; 触发时…

Kubernetes 调度约束(亲和性、污点、容忍)

目录 一、Pod启动典型创建过程 二、调度流程 三、指定调度节点 1.使用nodeName字段指定调度节点 2.使用nodeSelector指定调度节点 2.1给对应的node节点添加标签 2.2修改为nodeSelector调度方式 3.通过亲和性来指定调度节点 3.1节点亲和性 3.2Pod亲和性与反亲和性 3.2…