在前面的文章中,我们实现了召回和排序,接下来将进入推荐逻辑处理阶段,通常称为推荐中心,推荐中心负责接收应用系统的推荐请求,读取召回和排序的结果并进行调整,最后返回给应用系统。推荐中心的调用流程如下所示:
推荐接口设计
通常推荐接口包括 Feed 流推荐和相似文章推荐
Feed 流推荐:根据用户偏好,获取推荐文章列表(这里的时间戳用于区分是刷新推荐列表还是查看历史推荐列表)
参数:用户 ID,频道 ID,推荐文章数量,请求推荐的时间戳
结果:曝光参数,每篇文章的行为埋点参数,上一条推荐的时间戳
相似文章推荐:当用户浏览某文章时,获取该文章的相似文章列表
参数:文章 ID,推荐文章数量
结果:文章 ID 列表
行为埋点参数:
{
"param": '{"action": "exposure", "userId": 1, "articleId": [1,2,3,4], "algorithmCombine": "c1"}',
"recommends": [
{"article_id": 1, "param": {"click": "{"action": "click", "userId": "1", "articleId": 1, "algorithmCombine": 'c1'}", "collect": "...", "share": "...","read":"..."}},
{"article_id": 2, "param": {"click": "...", "collect": "...", "share": "...", "read":"..."}},
{"article_id": 3, "param": {"click": "...", "collect": "...", "share": "...", "read":"..."}},
{"article_id": 4, "param": {"click": "...", "collect": "...", "share": "...", "read":"..."}}
]
"timestamp": 1546391572
}
这里接口采用 gRPC 框架,在 user_reco.proto 文件中定义 Protobuf 序列化协议,其中定义了 Feed 流推荐接口:rpc user_recommend(User) returns (Track) {} 和相似文章接口:rpc article_recommend(Article) returns(Similar) {}
syntax = "proto3";
message User {
string user_id = 1;
int32 channel_id = 2;
int32 article_num = 3;
int64 time_stamp = 4;
}
// int32 ---> int64 article_id
message Article {
int64 article_id = 1;
int32 article_num = 2;
}
message param2 {
string click = 1;
string collect = 2;
string share = 3;
string read = 4;
}
message param1 {
int64 article_id = 1;
param2 params = 2;
}
message Track {
string exposure = 1;
repeated param1 recommends = 2;
int64 time_stamp = 3;
}
message Similar {
repeated int64 article_id = 1;
}
service UserRecommend {
rpc user_recommend(User) returns (Track) {}
rpc article_recommend(Article) returns(Similar) {}
}
接着,通过如下命令生成服务端文件 user_reco_pb2.py 和客户端文件 user_reco_pb2_grpc.py
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. user_reco.proto
定义参数解析类,用于解析推荐请求的参数,包括用户 ID、频道 ID、文章数量、请求时间戳以及算法名称
class Temp(object):
user_id = -10
channel_id = -10
article_num = -10
time_stamp = -10
algo = ""
定义封装埋点参数方法,其中参数 res 为推荐结果,参数 temp 为用户请求参数,将推荐结果封装为在 user_reco.proto 文件中定义的 Track 结构,其中携带了文章对埋点参数,包括了事件名称、算法名称以及时间等等,方便后面解析用户对文章对行为信息
def add_track(res, temp):
"""
封装埋点参数
:param res: 推荐文章id列表
:param temp: rpc参数
:return: 埋点参数
文章列表参数
单文章参数
"""
# 添加埋点参数
track = {}
# 准备曝光参数
# 全部字符串形式提供,在hive端不会解析问题
_exposure = {"action": "exposure", "userId": temp.user_id, "articleId": json.dumps(res),
"algorithmCombine": temp.algo}
track['param'] = json.dumps(_exposure)
track['recommends'] = []
# 准备其它点击参数
for _id in res:
# 构造字典
_dic = {}
_dic['article_id'] = _id
_dic['param'] = {}
# 准备click参数
_p = {"action": "click", "userId": temp.user_id, "articleId": str(_id),
"algorithmCombine": temp.algo}
_dic['param']['click'] = json.dumps(_p)
# 准备collect参数
_p["action"] = 'collect'
_dic['param']['collect'] = json.dumps(_p)
# 准备share参数
_p["action"] = 'share'
_dic['param']['share'] = json.dumps(_p)
# 准备detentionTime参数
_p["action"] = 'read'
_dic['param']['read'] = json.dumps(_p)
track['recommends'].append(_dic)
track['timestamp'] = temp.time_stamp
return track
AB Test 流量切分
由于推荐算法和策略是需要不断改进和完善等,所以 ABTest 也是推荐系统不可或缺的功能。可以根据用户 ID 将流量切分为多个桶(Bucket),每个桶对应一种排序策略,桶内流量将使用相应的策略进行排序,使用 ID 进行流量切分能够保证用户体验的一致性。通常 ABTest 过程如下所示:
通过定义 AB Test 参数,可以实现为不同的用户使用不同的推荐算法策略,其中 COMBINE 为融合方式,RECALL 为召回方式,SORT 为排序方式,CHANNEL 为频道数量,BYPASS 为分桶设置,sort_dict 为不同的排序服务对象。可以看到 Algo-1 使用 LR 进行排序,而 Algo-2 使用 Wide&Deep 进行排序
from collections import namedtuple
# ABTest参数信息
param = namedtuple('RecommendAlgorithm', ['COMBINE',
'RECALL',
'SORT',
'CHANNEL',
'BYPASS']
)
RAParam = param(
COMBINE={
'Algo-1': (1, [100, 101, 102, 103, 104], [200]), # 首页推荐,所有召回结果读取+LR排序
'Algo-2': (2, [100, 101, 102, 103, 104], [201]) # 首页推荐,所有召回结果读取 排序
},
RECALL={
100: ('cb_recall', 'als'), # 离线模型ALS召回,recall:user:1115629498121 column=als:18
101: ('cb_recall', 'content'), # 离线word2vec的画像内容召回 'recall:user:5', 'content:1'
102: ('cb_recall', 'online'), # 在线word2vec的画像召回 'recall:user:1', 'online:1'
103: 'new_article', # 新文章召回 redis当中 ch:18:new
104: 'popular_article', # 基于用户协同召回结果 ch:18:hot
105: ('article_similar', 'similar') # 文章相似推荐结果 '1' 'similar:2'
},
SORT={
200: 'LR',
201: 'WDL'
},
CHANNEL=25,
BYPASS=[
{
"Bucket": ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd'],
"Strategy": "Algo-1"
},
{
"BeginBucket": ['e', 'f'],
"Strategy": "Algo-2"
}
]
)
sort_dict = {
"LR": lr_sort_service,
"WDL": wdl_sort_service
}
流量切分,将用户 ID 进行哈希,然后取哈希结果的第一个字符,将包含该字符的策略桶所对应的算法编号赋值到此用户请求参数的 algo 属性中,后面将调用该编号对应的算法策略为此用户计算推荐数据
import hashlib
from setting.default import DefaultConfig, RAParam
# 进行分桶实现分流,制定不同的实验策略
bucket = hashlib.md5(user_id.encode()).hexdigest()[:1]
if bucket in RAParam.BYPASS[0]['Bucket']:
temp.algo = RAParam.BYPASS[0]['Strategy']
else:
temp.algo = RAParam.BYPASS[1]['Strategy']
推荐中心逻辑
推荐中心逻辑主要包括:
接收应用系统发送的推荐请求,解析请求参数
进行 ABTest 分流,为用户分配推荐策略
根据分配的算法调用召回服务和排序服务,读取推荐结果
根据业务进行调整,如过滤、补足、合并信息等
封装埋点参数,返回推荐结果
首先,在 Hbase 中创建历史推荐结果表 history_recommend,用于存储用户历史推荐结果
create 'history_recommend', {NAME=>'channel', TTL=>7776000, VERSIONS=>999999} 86400
# 每次指定一个时间戳,可以达到不同版本的效果
put 'history_recommend', 'reco:his:1', 'channel:18', [17283, 140357, 14668, 15182, 17999, 13648, 12884,18135]
继续在 Hbase 中创建待推荐结果表 wait_recommend,用于存储经过多路召回并且排序之后的待推荐结果,当 wait_recommend 没有数据时,才再次调用排序服务计算出新的待推荐结果并写入到 wait_recommend,所以不需设置多个版本。注意该表与 cb_recall 的区别,cb_recall 存储的是还未经排序的召回结果。
create 'wait_recommend', 'channel'
put 'wait_recommend', 'reco:1', 'channel:18', [17283, 140357, 14668, 15182, 17999, 13648, 12884,18135]
put 'wait_recommend', 'reco:1', 'channel:0', [17283, 140357, 14668, 15182, 17999, 13648, 12884, 17302, 13846]
用户获取 Feed 流推荐数据时,如果用户向下滑动,发出的是刷新推荐列表的请求,需要传入当前时间作为请求时间戳参数,该请求时间戳必然大于 Hbase 历史推荐结果表中的请求时间戳,那么程序将获取新的推荐列表,并返回 Hbase 历史推荐结果表中最近一次推荐的请求时间戳,用于查询历史推荐结果;如果用户向上滑动,发出的是查看历史推荐结果的请求,需要传入前面刷新推荐列表时返回的最近一次推荐的请求时间戳,该请求时间戳必然小于等于 Hbase 历史推荐结果中最近一次推荐的时间戳,那么程序将获取小于等于该请求时间戳的最近一次历史推荐结果,并返回小于该推荐结果最近一次推荐的时间戳,也就是上一次推荐的时间戳,下面是具体实现。
在获取推荐列表时,首先获取用户的历史数据库中最近一次时间戳 last_stamp,没有则将 last_stamp 置为 0
try:
last_stamp = self.hbu.get_table_row('history_recommend',
'reco:his:{}'.format(temp.user_id).encode(),
'channel:{}'.format(temp.channel_id).encode(),
include_timestamp=True)[1]
except Exception as e:
last_stamp = 0
如果用户请求的时间戳小于历史推荐结果中最近一次请求的时间戳 last_stamp,那么该请求为用户获取历史推荐结果
1.如果没有历史推荐结果,则返回时间戳 0 以及空列表 []
2.如果历史推荐结果只有一条,则返回这一条历史推荐结果并返回时间戳 0,表示已经没有历史推荐结果(APP 可以显示已经没有历史推荐记录了)
3.如果历史推荐结果有多条,则返回历史推荐结果中第一条推荐结果(最近一次),然后返回历史推荐结果中第二条推荐结果的时间戳
if temp.time_stamp < last_stamp:
try:
row = self.hbu.get_table_cells('history_recommend',
'reco:his:{}'.format(temp.user_id).encode(),
'channel:{}'.format(temp.channel_id).encode(),
timestamp=temp.time_stamp + 1,
include_timestamp=True)
except Exception as e:
row = []
res = []
if not row:
temp.time_stamp = 0
res = []
elif len(row) == 1 and row[0][1] == temp.time_stamp:
res = eval(row[0][0])
temp.time_stamp = 0
elif len(row) >= 2:
res = eval(row[0][0])
temp.time_stamp = int(row[1][1])
res = list(map(int, res))
# 封装推荐结果
track = add_track(res, temp)
# 曝光参数设置为空
track['param'] = ''
(注意:这里将用户请求的时间戳 +1,因为 Hbase 只能获取小于该时间戳的历史推荐结果)
如果用户请求的时间戳大于 Hbase 历史推荐结果中最近一次请求的时间戳 last_stamp,那么该请求为用户刷新推荐列表,需要读取推荐结果并返回。如果结果为空,需要调用 user_reco_list() 方法,再次计算推荐结果,再返回。
if temp.time_stamp > last_stamp:
# 获取缓存
res = redis_cache.get_reco_from_cache(temp, self.hbu)
# 如果结果为空,需要再次计算推荐结果 进行召回+排序,同时写入到hbase待推荐结果列表
if not res:
res = self.user_reco_list(temp)
temp.time_stamp = int(last_stamp)
track = add_track(res, temp)
定义 user_reco_list() 方法,首先要读取多路召回结果,根据为用户分配的算法策略,读取相应路径的召回结果,并进行重后合并
reco_set = []
# (1, [100, 101, 102, 103, 104], [200])
for number in RAParam.COMBINE[temp.algo][1]:
if number == 103:
_res = self.recall_service.read_redis_new_article(temp.channel_id)
reco_set = list(set(reco_set).union(set(_res)))
elif number == 104:
_res = self.recall_service.read_redis_hot_article(temp.channel_id)
reco_set = list(set(reco_set).union(set(_res)))
else:
# 100, 101, 102召回结果读取
_res = self.recall_service.read_hbase_recall(RAParam.RECALL[number][0],
'recall:user:{}'.format(temp.user_id).encode(),
'{}:{}'.format(RAParam.RECALL[number][1],
temp.channel_id).encode())
reco_set = list(set(reco_set).union(set(_res)))
接着,过滤当前该请求频道的历史推荐结果,如果不是 0 频道还需过滤 0 频道的历史推荐结果
history_list = []
data = self.hbu.get_table_cells('history_recommend',
'reco:his:{}'.format(temp.user_id).encode(),
'channel:{}'.format(temp.channel_id).encode())
for _ in data:
history_list = list(set(history_list).union(set(eval(_))))
data = self.hbu.get_table_cells('history_recommend',
'reco:his:{}'.format(temp.user_id).encode(),
'channel:{}'.format(0).encode())
for _ in data:
history_list = list(set(history_list).union(set(eval(_))))
reco_set = list(set(reco_set).difference(set(history_list)))
最后,根据分配的算法策略,调用排序服务,将分数最高的 N 个推荐结果返回,并写入历史推荐结果表,如果还有剩余的排序结果,将其余写入待推荐结果表
# 使用指定模型对召回结果进行排序
# temp.user_id, reco_set
_sort_num = RAParam.COMBINE[temp.algo][2][0]
# 'LR'
reco_set = sort_dict[RAParam.SORT[_sort_num]](reco_set, temp, self.hbu)
if not reco_set:
return reco_set
else:
# 如果reco_set小于用户需要推荐的文章
if len(reco_set) <= temp.article_num:
res = reco_set
else:
# 大于要推荐的文章结果
res = reco_set[:temp.article_num]
# 将剩下的文章列表写入待推荐的结果
self.hbu.get_table_put('wait_recommend',
'reco:{}'.format(temp.user_id).encode(),
'channel:{}'.format(temp.channel_id).encode(),
str(reco_set[temp.article_num:]).encode(),
timestamp=temp.time_stamp)
# 直接写入历史记录当中,表示这次又成功推荐一次
self.hbu.get_table_put('history_recommend',
'reco:his:{}'.format(temp.user_id).encode(),
'channel:{}'.format(temp.channel_id).encode(),
str(res).encode(),
timestamp=temp.time_stamp)
return res
到这里,推荐中心的基本逻辑已经结束。下面是读取多路召回结果的实现细节:通过指定列族,读取基于模型、离线内容以及在线的召回结果,并删除 cb_recall 的召回结果
def read_hbase_recall_data(self, table_name, key_format, column_format):
"""
读取cb_recall当中的推荐数据
读取的时候可以选择列族进行读取als, online, content
:return:
"""
recall_list = []
data = self.hbu.get_table_cells(table_name, key_format, column_format)
# data是多个版本的推荐结果[[],[],[],]
for _ in data:
recall_list = list(set(recall_list).union(set(eval(_))))
self.hbu.get_table_delete(table_name, key_format, column_format)
return recall_list
读取 redis 中的新文章
def read_redis_new_article(self, channel_id):
"""
读取新文章召回结果
:param channel_id: 提供频道
:return:
"""
_key = "ch:{}:new".format(channel_id)
try:
res = self.client.zrevrange(_key, 0, -1)
except Exception as e:
res = []
return list(map(int, res))
读取 redis 中的热门文章,并选取热度最高的前 K 个文章
def read_redis_hot_article(self, channel_id):
"""
读取热门文章召回结果
:param channel_id: 提供频道
:return:
"""
_key = "ch:{}:hot".format(channel_id)
try:
res = self.client.zrevrange(_key, 0, -1)
except Exception as e:
# 由于每个频道的热门文章有很多,因为 保留文章点击次数
res = list(map(int, res))
if len(res) > self.hot_num:
res = res[:self.hot_num]
return res
读取相似文章
def read_hbase_article_similar(self, table_name, key_format, article_num):
"""获取文章相似结果
:param article_id: 文章id
:param article_num: 文章数量
:return:
"""
try:
_dic = self.hbu.get_table_row(table_name, key_format)
res = []
_srt = sorted(_dic.items(), key=lambda obj: obj[1], reverse=True)
if len(_srt) > article_num:
_srt = _srt[:article_num]
for _ in _srt:
res.append(int(_[0].decode().split(':')[1]))
except Exception as e:
res = []
return res
使用缓存策略
如果 redis 缓存中存在数据,就直接从 redis 缓存中获取推荐结果
如果 redis 缓存为空而 Hbase 的待推荐结果表 wait_recommend 不为空,则从 wait_recommend 中获取推荐结果,并将一定数量的待推荐结果放入 redis 缓存中
若 redis 和 wait_recommend 都为空,则需读取召回结果并进行排序,将排序结果写入 Hbase 的待推荐结果表 wait_recommend 中及 redis 中
(每次读取的推荐结果都要将其写入 Hbase 的历史推荐结果表 history_recommend 中)
读取 redis 缓存
#读取redis对应的键
key = 'reco:{}:{}:art'.format(temp.user_id, temp.channel_id)
# 读取,删除,返回结果
pl = cache_client.pipeline()
# 读取redis数据
res = cache_client.zrevrange(key, 0, temp.article_num - 1)
if res:
# 手动删除读取出来的缓存结果
pl.zrem(key, *res)
如果 redis 缓存为空
else:
# 删除键
cache_client.delete(key)
try:
# 从wait_recommend中读取
wait_cache = eval(hbu.get_table_row('wait_recommend',
'reco:{}'.format(temp.user_id).encode(),
'channel:{}'.format(temp.channel_id).encode()))
except Exception as e:
wait_cache = []
# 如果为空则直接返回空
if not wait_cache:
return wait_cache
# 如果wait_recommend中有数据
if len(wait_cache) > 100:
cache_redis = wait_cache[:100]
# 前100个数据放入redis
pl.zadd(key, dict(zip(cache_redis, range(len(cache_redis)))))
# 100个后面的数据,在放回wait_recommend
hbu.get_table_put('wait_recommend',
'reco:{}'.format(temp.user_id).encode(),
'channel:{}'.format(temp.channel_id).encode(),
str(wait_cache[100:]).encode())
else:
# 清空wait_recommend数据
hbu.get_table_put('wait_recommend',
'reco:{}'.format(temp.user_id).encode(),
'channel:{}'.format(temp.channel_id).encode(),
str([]).encode())
# 所有不足100个数据,放入redis
pl.zadd(key, dict(zip(wait_cache, range(len(wait_cache)))))
res = cache_client.zrange(key, 0, temp.article_num - 1)
最后,在 Supervisor 中配置 gRPC 实时推荐程序
[program:online]
environment=JAVA_HOME=/root/bigdata/jdk,SPARK_HOME=/root/bigdata/spark,HADOOP_HOME=/root/bigdata/hadoop,PYSPARK_PYTHON=/miniconda2/envs/reco_sys/bin/python ,PYSPARK_DRIVER_PYTHON=/miniconda2/envs/reco_sys/bin/python
command=/miniconda2/envs/reco_sys/bin/python /root/toutiao_project/reco_sys/abtest/routing.py
directory=/root/toutiao_project/reco_sys/abtest
user=root
autorestart=true
redirect_stderr=true
stdout_logfile=/root/logs/recommendsuper.log
loglevel=info
stopsignal=KILL
stopasgroup=true
killasgroup=true
The End
文章推荐系统系列到此就完结啦~ 撒花 🎉🎉🎉
若有疏漏的地方,欢迎各位多多指正,感谢关注,love & peace. 🙏🙏🙏
参考