2.7.4.2 相似度计算
- 目的:计算18号Python频道的文章之间相似度
- 步骤:
- 1、读取数据,进行类型处理(数组到Vector)
- 2、BRP进行FIT
读取数据,进行类型处理(数组到Vector)
from pyspark.ml.linalg import Vectors
# 选取部分数据做测试
article_vector = w2v.spark.sql("select article_id, articlevector from article_vector where channel_id=18 limit 10")
train = articlevector.select(['article_id', 'articleVector'])def _array_to_vector(row):return row.article_id, Vectors.dense(row.articleVector)train = train.rdd.map(_array_to_vector).toDF(['article_id', 'articleVector'])
BRP进行FIT
- class pyspark.ml.feature.BucketedRandomProjectionLSH(inputCol=None, outputCol=None, seed=None, numHashTables=1, bucketLength=None)
- inputCol=None:输入特征列
- outputCol=None:输出特征列
- numHashTables=1:哈希表数量,几个hash function对数据进行hash操作
- bucketLength=None:桶的数量,值越大相同数据进入到同一个桶的概率越高
- method:
- approxSimilarityJoin(df1, df2, 2.0, distCol='EuclideanDistance')
- 计算df1每个文章相似的df2数据集的数据
from pyspark.ml.feature import BucketedRandomProjectionLSHbrp = BucketedRandomProjectionLSH(inputCol='articleVector', outputCol='hashes', numHashTables=4.0, bucketLength=10.0)
model = brp.fit(train)
计算相似的文章以及相似度
similar = model.approxSimilarityJoin(test, train, 2.0, distCol='EuclideanDistance')similar.sort(['EuclideanDistance']).show()
2.7.4.3 问题3
对于计算出来的相似度,是要在推荐的时候使用。那么我们所知的是,HIVE只适合在离线分析时候使用,因为运行速度慢,所以只能将相似度存储到HBASE当中
- hbase
2.7.5 文章相似度存储
-
目的:将所有文章对应相似度文章及其相似度保存
-
步骤:
- 调用foreachPartition
- foreachPartition不同于map和mapPartition,主要用于离线分析之后的数据落地,如果想要返回新的一个数据DF,就使用map后者。
- 调用foreachPartition
我们需要建立一个HBase存储文章相似度的表
create 'article_similar', 'similar'# 存储格式如下:key:为article_id, 'similar:article_id', 结果为相似度
put 'article_similar', '1', 'similar:1', 0.2
put 'article_similar', '1', 'similar:2', 0.34
put 'article_similar', '1', 'similar:3', 0.267
put 'article_similar', '1', 'similar:4', 0.56
put 'article_similar', '1', 'similar:5', 0.7
put 'article_similar', '1', 'similar:6', 0.819
put 'article_similar', '1', 'similar:8', 0.28
定义保存HBASE函数,确保我们的happybase连接hbase启动成功,Thrift服务打开。hbase集群出现退出等问题常见原因,配置文件hadoop目录,地址等,还有
- ntpdate 0.cn.pool.ntp.org或者ntpdate ntp1.aliyun.com
- hbase-daemon.sh start thrift
def save_hbase(partition):import happybasepool = happybase.ConnectionPool(size=3, host='hadoop-master')with pool.connection() as conn:# 建议表的连接table = conn.table('article_similar')for row in partition:if row.datasetA.article_id == row.datasetB.article_id:passelse:table.put(str(row.datasetA.article_id).encode(),{"similar:{}".format(row.datasetB.article_id).encode(): b'%0.4f' % (row.EuclideanDistance)})# 手动关闭所有的连接conn.close()similar.foreachPartition(save_hbase)