电影推荐系统 python简书_文章推荐系统 | 十四、推荐中心

news/2024/11/30 1:42:12/

在前面的文章中,我们实现了召回和排序,接下来将进入推荐逻辑处理阶段,通常称为推荐中心,推荐中心负责接收应用系统的推荐请求,读取召回和排序的结果并进行调整,最后返回给应用系统。推荐中心的调用流程如下所示:

推荐接口设计

通常推荐接口包括 Feed 流推荐和相似文章推荐

Feed 流推荐:根据用户偏好,获取推荐文章列表(这里的时间戳用于区分是刷新推荐列表还是查看历史推荐列表)

参数:用户 ID,频道 ID,推荐文章数量,请求推荐的时间戳

结果:曝光参数,每篇文章的行为埋点参数,上一条推荐的时间戳

相似文章推荐:当用户浏览某文章时,获取该文章的相似文章列表

参数:文章 ID,推荐文章数量

结果:文章 ID 列表

行为埋点参数:

{

"param": '{"action": "exposure", "userId": 1, "articleId": [1,2,3,4], "algorithmCombine": "c1"}',

"recommends": [

{"article_id": 1, "param": {"click": "{"action": "click", "userId": "1", "articleId": 1, "algorithmCombine": 'c1'}", "collect": "...", "share": "...","read":"..."}},

{"article_id": 2, "param": {"click": "...", "collect": "...", "share": "...", "read":"..."}},

{"article_id": 3, "param": {"click": "...", "collect": "...", "share": "...", "read":"..."}},

{"article_id": 4, "param": {"click": "...", "collect": "...", "share": "...", "read":"..."}}

]

"timestamp": 1546391572

}

这里接口采用 gRPC 框架,在 user_reco.proto 文件中定义 Protobuf 序列化协议,其中定义了 Feed 流推荐接口:rpc user_recommend(User) returns (Track) {} 和相似文章接口:rpc article_recommend(Article) returns(Similar) {}

syntax = "proto3";

message User {

string user_id = 1;

int32 channel_id = 2;

int32 article_num = 3;

int64 time_stamp = 4;

}

// int32 ---> int64 article_id

message Article {

int64 article_id = 1;

int32 article_num = 2;

}

message param2 {

string click = 1;

string collect = 2;

string share = 3;

string read = 4;

}

message param1 {

int64 article_id = 1;

param2 params = 2;

}

message Track {

string exposure = 1;

repeated param1 recommends = 2;

int64 time_stamp = 3;

}

message Similar {

repeated int64 article_id = 1;

}

service UserRecommend {

rpc user_recommend(User) returns (Track) {}

rpc article_recommend(Article) returns(Similar) {}

}

接着,通过如下命令生成服务端文件 user_reco_pb2.py 和客户端文件 user_reco_pb2_grpc.py

python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. user_reco.proto

定义参数解析类,用于解析推荐请求的参数,包括用户 ID、频道 ID、文章数量、请求时间戳以及算法名称

class Temp(object):

user_id = -10

channel_id = -10

article_num = -10

time_stamp = -10

algo = ""

定义封装埋点参数方法,其中参数 res 为推荐结果,参数 temp 为用户请求参数,将推荐结果封装为在 user_reco.proto 文件中定义的 Track 结构,其中携带了文章对埋点参数,包括了事件名称、算法名称以及时间等等,方便后面解析用户对文章对行为信息

def add_track(res, temp):

"""

封装埋点参数

:param res: 推荐文章id列表

:param temp: rpc参数

:return: 埋点参数

文章列表参数

单文章参数

"""

# 添加埋点参数

track = {}

# 准备曝光参数

# 全部字符串形式提供,在hive端不会解析问题

_exposure = {"action": "exposure", "userId": temp.user_id, "articleId": json.dumps(res),

"algorithmCombine": temp.algo}

track['param'] = json.dumps(_exposure)

track['recommends'] = []

# 准备其它点击参数

for _id in res:

# 构造字典

_dic = {}

_dic['article_id'] = _id

_dic['param'] = {}

# 准备click参数

_p = {"action": "click", "userId": temp.user_id, "articleId": str(_id),

"algorithmCombine": temp.algo}

_dic['param']['click'] = json.dumps(_p)

# 准备collect参数

_p["action"] = 'collect'

_dic['param']['collect'] = json.dumps(_p)

# 准备share参数

_p["action"] = 'share'

_dic['param']['share'] = json.dumps(_p)

# 准备detentionTime参数

_p["action"] = 'read'

_dic['param']['read'] = json.dumps(_p)

track['recommends'].append(_dic)

track['timestamp'] = temp.time_stamp

return track

AB Test 流量切分

由于推荐算法和策略是需要不断改进和完善等,所以 ABTest 也是推荐系统不可或缺的功能。可以根据用户 ID 将流量切分为多个桶(Bucket),每个桶对应一种排序策略,桶内流量将使用相应的策略进行排序,使用 ID 进行流量切分能够保证用户体验的一致性。通常 ABTest 过程如下所示:

通过定义 AB Test 参数,可以实现为不同的用户使用不同的推荐算法策略,其中 COMBINE 为融合方式,RECALL 为召回方式,SORT 为排序方式,CHANNEL 为频道数量,BYPASS 为分桶设置,sort_dict 为不同的排序服务对象。可以看到 Algo-1 使用 LR 进行排序,而 Algo-2 使用 Wide&Deep 进行排序

from collections import namedtuple

# ABTest参数信息

param = namedtuple('RecommendAlgorithm', ['COMBINE',

'RECALL',

'SORT',

'CHANNEL',

'BYPASS']

)

RAParam = param(

COMBINE={

'Algo-1': (1, [100, 101, 102, 103, 104], [200]), # 首页推荐,所有召回结果读取+LR排序

'Algo-2': (2, [100, 101, 102, 103, 104], [201]) # 首页推荐,所有召回结果读取 排序

},

RECALL={

100: ('cb_recall', 'als'), # 离线模型ALS召回,recall:user:1115629498121 column=als:18

101: ('cb_recall', 'content'), # 离线word2vec的画像内容召回 'recall:user:5', 'content:1'

102: ('cb_recall', 'online'), # 在线word2vec的画像召回 'recall:user:1', 'online:1'

103: 'new_article', # 新文章召回 redis当中 ch:18:new

104: 'popular_article', # 基于用户协同召回结果 ch:18:hot

105: ('article_similar', 'similar') # 文章相似推荐结果 '1' 'similar:2'

},

SORT={

200: 'LR',

201: 'WDL'

},

CHANNEL=25,

BYPASS=[

{

"Bucket": ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd'],

"Strategy": "Algo-1"

},

{

"BeginBucket": ['e', 'f'],

"Strategy": "Algo-2"

}

]

)

sort_dict = {

"LR": lr_sort_service,

"WDL": wdl_sort_service

}

流量切分,将用户 ID 进行哈希,然后取哈希结果的第一个字符,将包含该字符的策略桶所对应的算法编号赋值到此用户请求参数的 algo 属性中,后面将调用该编号对应的算法策略为此用户计算推荐数据

import hashlib

from setting.default import DefaultConfig, RAParam

# 进行分桶实现分流,制定不同的实验策略

bucket = hashlib.md5(user_id.encode()).hexdigest()[:1]

if bucket in RAParam.BYPASS[0]['Bucket']:

temp.algo = RAParam.BYPASS[0]['Strategy']

else:

temp.algo = RAParam.BYPASS[1]['Strategy']

推荐中心逻辑

推荐中心逻辑主要包括:

接收应用系统发送的推荐请求,解析请求参数

进行 ABTest 分流,为用户分配推荐策略

根据分配的算法调用召回服务和排序服务,读取推荐结果

根据业务进行调整,如过滤、补足、合并信息等

封装埋点参数,返回推荐结果

首先,在 Hbase 中创建历史推荐结果表 history_recommend,用于存储用户历史推荐结果

create 'history_recommend', {NAME=>'channel', TTL=>7776000, VERSIONS=>999999} 86400

# 每次指定一个时间戳,可以达到不同版本的效果

put 'history_recommend', 'reco:his:1', 'channel:18', [17283, 140357, 14668, 15182, 17999, 13648, 12884,18135]

继续在 Hbase 中创建待推荐结果表 wait_recommend,用于存储经过多路召回并且排序之后的待推荐结果,当 wait_recommend 没有数据时,才再次调用排序服务计算出新的待推荐结果并写入到 wait_recommend,所以不需设置多个版本。注意该表与 cb_recall 的区别,cb_recall 存储的是还未经排序的召回结果。

create 'wait_recommend', 'channel'

put 'wait_recommend', 'reco:1', 'channel:18', [17283, 140357, 14668, 15182, 17999, 13648, 12884,18135]

put 'wait_recommend', 'reco:1', 'channel:0', [17283, 140357, 14668, 15182, 17999, 13648, 12884, 17302, 13846]

用户获取 Feed 流推荐数据时,如果用户向下滑动,发出的是刷新推荐列表的请求,需要传入当前时间作为请求时间戳参数,该请求时间戳必然大于 Hbase 历史推荐结果表中的请求时间戳,那么程序将获取新的推荐列表,并返回 Hbase 历史推荐结果表中最近一次推荐的请求时间戳,用于查询历史推荐结果;如果用户向上滑动,发出的是查看历史推荐结果的请求,需要传入前面刷新推荐列表时返回的最近一次推荐的请求时间戳,该请求时间戳必然小于等于 Hbase 历史推荐结果中最近一次推荐的时间戳,那么程序将获取小于等于该请求时间戳的最近一次历史推荐结果,并返回小于该推荐结果最近一次推荐的时间戳,也就是上一次推荐的时间戳,下面是具体实现。

在获取推荐列表时,首先获取用户的历史数据库中最近一次时间戳 last_stamp,没有则将 last_stamp 置为 0

try:

last_stamp = self.hbu.get_table_row('history_recommend',

'reco:his:{}'.format(temp.user_id).encode(),

'channel:{}'.format(temp.channel_id).encode(),

include_timestamp=True)[1]

except Exception as e:

last_stamp = 0

如果用户请求的时间戳小于历史推荐结果中最近一次请求的时间戳 last_stamp,那么该请求为用户获取历史推荐结果

1.如果没有历史推荐结果,则返回时间戳 0 以及空列表 []

2.如果历史推荐结果只有一条,则返回这一条历史推荐结果并返回时间戳 0,表示已经没有历史推荐结果(APP 可以显示已经没有历史推荐记录了)

3.如果历史推荐结果有多条,则返回历史推荐结果中第一条推荐结果(最近一次),然后返回历史推荐结果中第二条推荐结果的时间戳

if temp.time_stamp < last_stamp:

try:

row = self.hbu.get_table_cells('history_recommend',

'reco:his:{}'.format(temp.user_id).encode(),

'channel:{}'.format(temp.channel_id).encode(),

timestamp=temp.time_stamp + 1,

include_timestamp=True)

except Exception as e:

row = []

res = []

if not row:

temp.time_stamp = 0

res = []

elif len(row) == 1 and row[0][1] == temp.time_stamp:

res = eval(row[0][0])

temp.time_stamp = 0

elif len(row) >= 2:

res = eval(row[0][0])

temp.time_stamp = int(row[1][1])

res = list(map(int, res))

# 封装推荐结果

track = add_track(res, temp)

# 曝光参数设置为空

track['param'] = ''

(注意:这里将用户请求的时间戳 +1,因为 Hbase 只能获取小于该时间戳的历史推荐结果)

如果用户请求的时间戳大于 Hbase 历史推荐结果中最近一次请求的时间戳 last_stamp,那么该请求为用户刷新推荐列表,需要读取推荐结果并返回。如果结果为空,需要调用 user_reco_list() 方法,再次计算推荐结果,再返回。

if temp.time_stamp > last_stamp:

# 获取缓存

res = redis_cache.get_reco_from_cache(temp, self.hbu)

# 如果结果为空,需要再次计算推荐结果 进行召回+排序,同时写入到hbase待推荐结果列表

if not res:

res = self.user_reco_list(temp)

temp.time_stamp = int(last_stamp)

track = add_track(res, temp)

定义 user_reco_list() 方法,首先要读取多路召回结果,根据为用户分配的算法策略,读取相应路径的召回结果,并进行重后合并

reco_set = []

# (1, [100, 101, 102, 103, 104], [200])

for number in RAParam.COMBINE[temp.algo][1]:

if number == 103:

_res = self.recall_service.read_redis_new_article(temp.channel_id)

reco_set = list(set(reco_set).union(set(_res)))

elif number == 104:

_res = self.recall_service.read_redis_hot_article(temp.channel_id)

reco_set = list(set(reco_set).union(set(_res)))

else:

# 100, 101, 102召回结果读取

_res = self.recall_service.read_hbase_recall(RAParam.RECALL[number][0],

'recall:user:{}'.format(temp.user_id).encode(),

'{}:{}'.format(RAParam.RECALL[number][1],

temp.channel_id).encode())

reco_set = list(set(reco_set).union(set(_res)))

接着,过滤当前该请求频道的历史推荐结果,如果不是 0 频道还需过滤 0 频道的历史推荐结果

history_list = []

data = self.hbu.get_table_cells('history_recommend',

'reco:his:{}'.format(temp.user_id).encode(),

'channel:{}'.format(temp.channel_id).encode())

for _ in data:

history_list = list(set(history_list).union(set(eval(_))))

data = self.hbu.get_table_cells('history_recommend',

'reco:his:{}'.format(temp.user_id).encode(),

'channel:{}'.format(0).encode())

for _ in data:

history_list = list(set(history_list).union(set(eval(_))))

reco_set = list(set(reco_set).difference(set(history_list)))

最后,根据分配的算法策略,调用排序服务,将分数最高的 N 个推荐结果返回,并写入历史推荐结果表,如果还有剩余的排序结果,将其余写入待推荐结果表

# 使用指定模型对召回结果进行排序

# temp.user_id, reco_set

_sort_num = RAParam.COMBINE[temp.algo][2][0]

# 'LR'

reco_set = sort_dict[RAParam.SORT[_sort_num]](reco_set, temp, self.hbu)

if not reco_set:

return reco_set

else:

# 如果reco_set小于用户需要推荐的文章

if len(reco_set) <= temp.article_num:

res = reco_set

else:

# 大于要推荐的文章结果

res = reco_set[:temp.article_num]

# 将剩下的文章列表写入待推荐的结果

self.hbu.get_table_put('wait_recommend',

'reco:{}'.format(temp.user_id).encode(),

'channel:{}'.format(temp.channel_id).encode(),

str(reco_set[temp.article_num:]).encode(),

timestamp=temp.time_stamp)

# 直接写入历史记录当中,表示这次又成功推荐一次

self.hbu.get_table_put('history_recommend',

'reco:his:{}'.format(temp.user_id).encode(),

'channel:{}'.format(temp.channel_id).encode(),

str(res).encode(),

timestamp=temp.time_stamp)

return res

到这里,推荐中心的基本逻辑已经结束。下面是读取多路召回结果的实现细节:通过指定列族,读取基于模型、离线内容以及在线的召回结果,并删除 cb_recall 的召回结果

def read_hbase_recall_data(self, table_name, key_format, column_format):

"""

读取cb_recall当中的推荐数据

读取的时候可以选择列族进行读取als, online, content

:return:

"""

recall_list = []

data = self.hbu.get_table_cells(table_name, key_format, column_format)

# data是多个版本的推荐结果[[],[],[],]

for _ in data:

recall_list = list(set(recall_list).union(set(eval(_))))

self.hbu.get_table_delete(table_name, key_format, column_format)

return recall_list

读取 redis 中的新文章

def read_redis_new_article(self, channel_id):

"""

读取新文章召回结果

:param channel_id: 提供频道

:return:

"""

_key = "ch:{}:new".format(channel_id)

try:

res = self.client.zrevrange(_key, 0, -1)

except Exception as e:

res = []

return list(map(int, res))

读取 redis 中的热门文章,并选取热度最高的前 K 个文章

def read_redis_hot_article(self, channel_id):

"""

读取热门文章召回结果

:param channel_id: 提供频道

:return:

"""

_key = "ch:{}:hot".format(channel_id)

try:

res = self.client.zrevrange(_key, 0, -1)

except Exception as e:

# 由于每个频道的热门文章有很多,因为 保留文章点击次数

res = list(map(int, res))

if len(res) > self.hot_num:

res = res[:self.hot_num]

return res

读取相似文章

def read_hbase_article_similar(self, table_name, key_format, article_num):

"""获取文章相似结果

:param article_id: 文章id

:param article_num: 文章数量

:return:

"""

try:

_dic = self.hbu.get_table_row(table_name, key_format)

res = []

_srt = sorted(_dic.items(), key=lambda obj: obj[1], reverse=True)

if len(_srt) > article_num:

_srt = _srt[:article_num]

for _ in _srt:

res.append(int(_[0].decode().split(':')[1]))

except Exception as e:

res = []

return res

使用缓存策略

如果 redis 缓存中存在数据,就直接从 redis 缓存中获取推荐结果

如果 redis 缓存为空而 Hbase 的待推荐结果表 wait_recommend 不为空,则从 wait_recommend 中获取推荐结果,并将一定数量的待推荐结果放入 redis 缓存中

若 redis 和 wait_recommend 都为空,则需读取召回结果并进行排序,将排序结果写入 Hbase 的待推荐结果表 wait_recommend 中及 redis 中

(每次读取的推荐结果都要将其写入 Hbase 的历史推荐结果表 history_recommend 中)

读取 redis 缓存

#读取redis对应的键

key = 'reco:{}:{}:art'.format(temp.user_id, temp.channel_id)

# 读取,删除,返回结果

pl = cache_client.pipeline()

# 读取redis数据

res = cache_client.zrevrange(key, 0, temp.article_num - 1)

if res:

# 手动删除读取出来的缓存结果

pl.zrem(key, *res)

如果 redis 缓存为空

else:

# 删除键

cache_client.delete(key)

try:

# 从wait_recommend中读取

wait_cache = eval(hbu.get_table_row('wait_recommend',

'reco:{}'.format(temp.user_id).encode(),

'channel:{}'.format(temp.channel_id).encode()))

except Exception as e:

wait_cache = []

# 如果为空则直接返回空

if not wait_cache:

return wait_cache

# 如果wait_recommend中有数据

if len(wait_cache) > 100:

cache_redis = wait_cache[:100]

# 前100个数据放入redis

pl.zadd(key, dict(zip(cache_redis, range(len(cache_redis)))))

# 100个后面的数据,在放回wait_recommend

hbu.get_table_put('wait_recommend',

'reco:{}'.format(temp.user_id).encode(),

'channel:{}'.format(temp.channel_id).encode(),

str(wait_cache[100:]).encode())

else:

# 清空wait_recommend数据

hbu.get_table_put('wait_recommend',

'reco:{}'.format(temp.user_id).encode(),

'channel:{}'.format(temp.channel_id).encode(),

str([]).encode())

# 所有不足100个数据,放入redis

pl.zadd(key, dict(zip(wait_cache, range(len(wait_cache)))))

res = cache_client.zrange(key, 0, temp.article_num - 1)

最后,在 Supervisor 中配置 gRPC 实时推荐程序

[program:online]

environment=JAVA_HOME=/root/bigdata/jdk,SPARK_HOME=/root/bigdata/spark,HADOOP_HOME=/root/bigdata/hadoop,PYSPARK_PYTHON=/miniconda2/envs/reco_sys/bin/python ,PYSPARK_DRIVER_PYTHON=/miniconda2/envs/reco_sys/bin/python

command=/miniconda2/envs/reco_sys/bin/python /root/toutiao_project/reco_sys/abtest/routing.py

directory=/root/toutiao_project/reco_sys/abtest

user=root

autorestart=true

redirect_stderr=true

stdout_logfile=/root/logs/recommendsuper.log

loglevel=info

stopsignal=KILL

stopasgroup=true

killasgroup=true

The End

文章推荐系统系列到此就完结啦~ 撒花 🎉🎉🎉

若有疏漏的地方,欢迎各位多多指正,感谢关注,love & peace. 🙏🙏🙏

参考


http://www.ppmy.cn/news/278569.html

相关文章

十分钟柔和护理,轻松舒缓眼疲劳,云康宝眼部按摩仪体验

平时工作、生活中&#xff0c;每天都要长时间对着手机、电脑等电子设备&#xff0c;像是被它们吸走了灵魂&#xff0c;有时候眼睛干干的、痛痛的&#xff0c;像是被沙子刮过&#xff0c;光靠眼药水之类的东西根本解决不了问题&#xff0c;所以趁着618我入手了一款眼部按摩仪&am…

推荐!推荐!推荐!

好久没来这个论坛了&#xff0c;给大家推荐一个很好的视频教程网&#xff0c;里面内容全面丰富&#xff01; http://www.abab123.com/bbs/down.asp?html1148355

推荐7个最新发现的神仙网站,让人心动!

1、云海电子图书馆 云海电子图书馆是一个免费下载电子书的网站&#xff0c;包含经济管理&#xff0c;小说文学&#xff0c;艺术摄影&#xff0c;计算机网络&#xff0c;英语和其它外语&#xff0c;传记等书籍。 2、智办事 智办事是一款以目标为导向&#xff0c;以人为中心&am…

最新主流 Markdown 编辑器推荐

Markdown &#xff0c;2004年由 John Gruberis 设计和开发&#xff0c;是一种可以使用普通文本编辑器编写的标记语言&#xff0c;通过简单的标记语法&#xff0c;它可以使普通文本内容具有一定的格式&#xff0c;以下将介绍目前比较流行的一些 Markdown 编辑器&#xff08;排名…

最新短信平台推荐一览

还不知道用哪个短信平台&#xff1f;下面整理了几个比较靠谱的短信平台&#xff0c;都是质量有保证的&#xff0c;大家可以根据自己的需要选择&#xff0c;绝对不踩雷&#xff01; 1&#xff0c;阿里云短信 价格&#xff1a;按照短信数量计费&#xff0c;国内短信单价在0.045元…

【强烈推荐】2021年最新Python学习路线图

第一阶段、Python基础 1、学习目标&#xff1a; 能够熟练使用Python技术完成针对小问题的程序编写以及小游戏程序的开发。 2、知识点&#xff1a; 1&#xff09;计算机组成原理 计算机组成部分、操作系统分类、B/S和C/S架构、理解软件与硬件的区别 2&#xff09;Python变量以及…

搜索推荐相关

搜索算法 Learning to Rank方法&#xff1a; 1、单文档方法&#xff1a;根据query判断每个item的相似度 2、文档对方法&#xff1a;确定文档对的前后顺序 3、文档列表法&#xff1a;确定所有文档的先后顺序 Item&#xff1a;垂域、意图、语义相似性、item的热度、用户的搜索日…

2023年最新最全的VScode插件推荐

这篇文章主要介绍了这么多年来我在使用 VSCode 过程中用到的一些不错的插件。这些VSCode插件&#xff0c;帮你打造地表最强IDE&#xff01; 一、主题及图标 GitHub Theme 黑白两款皮肤 Material Theme 集成了多种主题皮肤&#xff0c;搭配 Material Icon Theme 食用更佳。【…