【大模型理论篇】ToB的大模型系统非常有必要引入搜索推荐算法能力(回顾BPR、WD、ALS等经典算法)

news/2024/9/17 23:31:43/ 标签: 人工智能, 大模型, 搜索, 推荐, ALS, 宽深模型, BPR算法

1. 背景和思考          

        上周2024上海外滩大会如约而至,各种大咖云集,多种观点思想碰撞,带来很多新的启发。我个人比较关注大模型和隐私计算相关的内容,因此重点听了相关老师带来的行业前沿进展和深度思考。有两位老师的观点,特别认同,一位是百川智能的王小川老师,另一位是上海临港跨境数据的山栋明老师。

王小川老师:

       百川正在打造AI医生,切入具体的医疗场景。场景和技术双螺旋互补式上升。因为有具体的医疗场景,因此基于大模型的AGI能够具像化。才有现实的明确的意义,去推进大模型错误率降低,long context和多模态能力提升。 此外,还提到儿科医生陪伴的应用场景特别好,有了大模型医疗助手,能够通过快速问题分析和解决方案获得,可以大幅度降低家长的焦虑。并且全科医生就可以解决很多问题,起到分流作用,能够缓解医疗资源的集中挤兑问题。

山栋明老师:

        未来赚什么钱?要赚新物种的钱。大模型可以创造新物种,未来创新创业机会来自于新物种。积极拥抱模型,是因为模型的引入带来了生产率的大幅提升。基模被大象(大厂)垄断,需要庞大的算力资源,因此对于更大多数,垂类领域的机会要更多。一个健康的生态,必须是大象、蚂蚁、花草都需要具备,不然行业迟早玩完。我们需要做到对基模更懂行业,比行业更懂基模,这样才有机会

        另外,对于数据要素,他谈到四个阶段:数据资源->数据产品->数据资产->数据资本,涉及自然属性、商品属性、金融属性。数据具备自然资源属性的时候,是可以被称为数据资源,但该层面还比较原始,需要上升一级,加工成数据产品,能够交易流通,这对于数据要素至关重要。可被交易交换流通的数据产品,能够形成数据资产,能够支持价值的评估,进一步推动数据入表甚至可以银行贷款。再上一个水平,就到了数据资本,数据可以被证券化,形成资本玩法。所以数据要素不是静态的,而是动态演进。

        个人非常认同上述观点。王小川老师之所以提出要引入搜索相关能力,其实侧面反应出仅依赖大模型自身的能力,在ToB场景,特别是对于错误容忍度非常低的医疗场景,是远远不够的,比如大模型存在的幻觉问题,大模型自身在医疗场景的知识匮乏或者说不够及时,都可能影响模型的效果。因此很有必要引入检索增强技术RAG,将知识图谱、搜索排序等技术融合使用。在前期的文章《大模型LLM在垂直领域的应用(RAG、微调等)分析》中,我们对RAG进行了系统的介绍。其中对于RAG面临的问题重点进行了阐述,其中就包含missed top ranked、not extracted等搜索匹配失败的问题,造成不能回复出正确结果。

        上图为创建检索增强生成(RAG)系统所需的索引和查询过程。索引过程通常在开发阶段进行,而查询过程则在运行时进行。红色框为可能产生的故障点。本文探讨的重点主要在排序功能和准确性。很有必要在这些环境,实施排序推荐技术。因此我们接下来会回顾一些传统的推荐排序算法,一方面温故知新,另一方面也可以带着RAG技术背景做进一步思考。

2. 经典推荐&排序算法技术回顾

        【1,2,3】展示了一般的推荐系统架构以及处理逻辑。后续我们主要针对推荐&排序算法(会更侧重推荐场景来描述排序的优化)来讲解。排序是推荐系统的核心问题推荐系统在向用户展示推荐结果时,需要对可能的推荐项进行排序,以将最相关的内容排在前面。在这个过程中,Learn To Rank 方法很关键。Pointwise 方法可以直接用于预测评分,Pairwise 方法可以用来优化排序的准确性,而 Listwise 方法可以最大化整体推荐列表的质量。

2.1 BPR: Bayesian Personalized Ranking

2.1.1 算法简介

        BPR(Bayesian Personalized Ranking,贝叶斯个性化排序)【4】是一种常用于推荐系统中的排序算法。它主要用于处理隐式反馈数据,例如用户对某些项目的点击、浏览、停留时间、分享等行为。BPR的目标是通过最大化用户对项目的排名顺序的后验概率,来生成个性化的推荐列表。

        BPR的核心思想是基于配对比较的理念,即它通过比较用户更喜欢的项目对来学习个性化的排序模型。对于一个特定的用户,假设用户对已交互项目的偏好高于未交互项目。模型通过优化一个损失函数来最大化这种偏好的正确性,从而生成个性化的推荐结果。

        (1)图中的左侧显示了观测数据 S。直接从 S中学习是不可行的,因为只观测到正反馈。通常情况下,通过将矩阵填充为 0 值来生成负反馈数据。BPR方法创用户特定的项目对偏好i >_u j。在(2)图中,“加号”(+)表示用户更喜欢项目 i 而不是项目 j;“减号”(-)表示用户更喜欢项目 j 而不是项目 i。        

2.1.2 问题定义

        给定:

  • 用户集合 \mathcal{U} = {u_1, u_2, \ldots, u_m}
  • 项目集合 \mathcal{I} = {i_1, i_2, \ldots, i_n}
  • 用户-项目交互数据 D^+ = {(u, i) | u \in \mathcal{U}, i \in \mathcal{I}},表示用户 u 对项目 i 有正反馈(如点击、购买等)

        对于每个用户 u,BPR的目标是使用户对正反馈项目 i 的偏好大于对负反馈项目 j 的偏好,即:

\hat{x}_{u,i} > \hat{x}_{u,j}, \quad \forall (u, i, j) \in D

        其中 \hat{x}_{u,i} 是用户u对项目 i 的预测得分。

        事实上,细心的同学已经可以看出来,BPR隐含了某种负采样策略。由于对于所有用户来说,比较每个正样本与每个负样本在计算上是不可行的,因此引入了一种采样策略,它随机选择一个三元组——一个用户、一个正样本(用户已交互的物品)和一个负样本(用户未交互的物品),来更新模型参数。

2.1.3 偏好函数

        BPR引入了一个偏好函数来建模用户 u 对项目 i 和 j 的偏好差异:

\hat{x}_{u,ij} = \hat{x}_{u,i} - \hat{x}_{u,j}

        其中,\hat{x}_{u,ij}是用户 u 对项目 i 和 j 的偏好差。

2.1.4 优化目标

        BPR通过最大化用户对项目的相对排序的后验概率,来优化模型参数。其优化目标函数为:

\max_{\Theta} \prod_{(u,i,j) \in D} \sigma(\hat{x}_{u,ij})

        其中:

  • \Theta 是模型参数
  • \sigma(x) = \frac{1}{1 + e^{-x}} 是sigmoid函数,用来将偏好差映射到概率空间

        通过对数似然函数变换,目标函数可以表示为:

\max_{\Theta} \sum_{(u,i,j) \in D} \ln \sigma(\hat{x}_{u,ij}) - \lambda \|\Theta\|^2

        其中,\lambda |\Theta|^2 是用于防止过拟合的正则化项。

关于优化目标的推导,我们这边进行展开说明一下:

        BPR的目标是最大化用户 u 对项目 i 和 j 的偏好差的后验概率 P(\Theta | D),其中 \Theta 是模型的参数。

        根据贝叶斯定理,后验概率为:

P(\Theta | D) = \frac{P(D | \Theta) P(\Theta)}{P(D)}

        其中:

  • P(D | \Theta)是似然函数,表示在给定参数 \Theta 下数据 D 的概率
  • P(\Theta) 是参数的先验分布
  • P(D) 是归一化常数

        取对数得到对数后验概率:

\ln P(\Theta | D) = \ln P(D | \Theta) + \ln P(\Theta) - \ln P(D)

        其中:

  • \ln P(D)是一个常数,与参数\Theta 无关,所以在优化过程中可以忽略。
  • \ln P(\Theta)是参数 \Theta 的先验分布。在BPR的设定中,通常假设参数服从零均值的高斯分布:

P(\Theta) \propto e^{-\frac{\lambda}{2} \|\Theta\|^2}

        取对数后,可以得到:

\ln P(\Theta) = -\frac{\lambda}{2} \|\Theta\|^2 + \text{constant}

        BPR通过假设每个用户偏好差的概率分布来定义似然函数。具体来说,对每个用户-项目对 (u, i, j),预期模型的得分 \hat{x}_{u,i} 比 \hat{x}_{u,j} 更高,概率为:

P((u, i, j) \in D | \Theta) = \sigma(\hat{x}_{u,ij}) = \frac{1}{1 + e^{-\hat{x}_{u,ij}}}

        其中:

  • \sigma(x) = \frac{1}{1 + e^{-x}}是 sigmoid 函数
  • \hat{x}_{u,ij} = \hat{x}_{u,i} - \hat{x}_{u,j} 是用户 u 对项目 i 和 j 的偏好差

        在整个数据集 D 上,似然函数为:

P(D | \Theta) = \prod_{(u,i,j) \in D} \sigma(\hat{x}_{u,ij})

        为了简化计算,通常最大化对数似然函数(log-likelihood):

\ln P(D | \Theta) = \sum_{(u,i,j) \in D} \ln \sigma(\hat{x}_{u,ij})

        由于最大化后验概率 P(\Theta | D) 等价于最大化 P(D | \Theta) P(\Theta),因此最终的优化目标公式为如下,这个目标函数试图通过最大化用户偏好正确排序的对数概率。

\max_{\Theta} \ln P(D | \Theta) + \ln P(\Theta) = \sum_{(u,i,j) \in D} \ln \sigma(\hat{x}_{u,ij}) - \lambda_{\Theta} \|\Theta\|^2        

2.1.5 模型假设

        BPR的模型假设是,预测得分 \hat{x}_{u,i} 可以由一个线性模型来表示,\theta 也就是对应的线性模型中的参数。

        例如在矩阵分解模型中:

\hat{x}_{u,i} = \mathbf{p}_u^T \mathbf{q}_i

        其中:

  • \mathbf{p}_u 是用户 u 的潜在向量
  • \mathbf{q}_i是项目 i 的潜在向量

        用户对项目的偏好差为:

\hat{x}_{u,ij} = \mathbf{p}_u^T (\mathbf{q}_i - \mathbf{q}_j)

  • \Theta 表示模型的所有参数,包括用户和项目的潜在向量。
  • \sigma(x) = \frac{1}{1 + e^{-x}}​ 是 sigmoid 函数。
  • \hat{x}_{u,ij} = \hat{x}_{u,i} - \hat{x}_{u,j} = \mathbf{p}_u^T \mathbf{q}_i - \mathbf{p}_u^T \mathbf{q}_j = \mathbf{p}_u^T (\mathbf{q}_i - \mathbf{q}_j),表示用户 u 对项目 i 和 j 的偏好差。
  • \|\Theta\|^2 = (\|\mathbf{p}_u\|^2 + \|\mathbf{q}_i\|^2 + \|\mathbf{q}_j\|^2)是 L2 正则化项。

2.1.6 对参数求梯度

首先,将 BPR 目标函数重新整理如下:

L(\Theta) = \sum_{(u, i, j) \in D} \ln \sigma(\hat{x}_{u,ij}) - \frac{\lambda }{2}(\|\mathbf{p}_u\|^2 + \|\mathbf{q}_i\|^2 + \|\mathbf{q}_j\|^2)

对用户向量 \mathbf{p}_u​ 求梯度

\frac{\partial L}{\partial \mathbf{p}_u} = \sum_{(u, i, j) \in D} \frac{\partial}{\partial \mathbf{p}_u} \left( \ln \sigma(\hat{x}_{u,ij}) \right) - \frac{\lambda}{2} \frac{\partial}{\partial \mathbf{p}_u} (\|\mathbf{p}_u\|^2)

首先求解 \frac{\partial}{\partial \mathbf{p}_u} \left( \ln \sigma(\hat{x}_{u,ij}) \right)

\frac{\partial}{\partial \mathbf{p}_u} \ln \sigma(\hat{x}_{u,ij}) = \frac{1}{\sigma(\hat{x}_{u,ij})} \cdot \sigma(\hat{x}_{u,ij}) \cdot (1 - \sigma(\hat{x}_{u,ij})) \cdot \frac{\partial \hat{x}_{u,ij}}{\partial \mathbf{p}_u}

简化为:

\frac{\partial}{\partial \mathbf{p}_u} \ln \sigma(\hat{x}_{u,ij}) = (1 - \sigma(\hat{x}_{u,ij})) \cdot \frac{\partial \hat{x}_{u,ij}}{\partial \mathbf{p}_u}

因为:

\frac{\partial \hat{x}_{u,ij}}{\partial \mathbf{p}_u} = \frac{\partial}{\partial \mathbf{p}_u} \left( \mathbf{p}_u^T (\mathbf{q}_i - \mathbf{q}_j) \right) = \mathbf{q}_i - \mathbf{q}_j

所以:

\frac{\partial}{\partial \mathbf{p}_u} \ln \sigma(\hat{x}_{u,ij}) = (1 - \sigma(\hat{x}_{u,ij})) \cdot (\mathbf{q}_i - \mathbf{q}_j)

然后求解正则化项的梯度:

\frac{\partial}{\partial \mathbf{p}_u} (- \frac{\lambda}{2} \|\mathbf{p}_u\|^2) = - \lambda \mathbf{p}_u

因此,对用户向量 \mathbf{p}_u 的总梯度为:

\frac{\partial L}{\partial \mathbf{p}_u} = (1 - \sigma(\hat{x}_{u,ij})) \cdot (\mathbf{q}_i - \mathbf{q}_j) - \lambda \mathbf{p}_u

对正样本项目向量 \mathbf{q}_i 求梯度

同样地,对 \mathbf{q}_i求梯度:

\frac{\partial L}{\partial \mathbf{q}_i} = \sum_{(u, i, j) \in D} \frac{\partial}{\partial \mathbf{q}_i} \left( \ln \sigma(\hat{x}_{u,ij}) \right) - \frac{\lambda}{2} \frac{\partial}{\partial \mathbf{q}_i} (\|\mathbf{q}_i\|^2)

首先计算 \frac{\partial}{\partial \mathbf{q}_i} \ln \sigma(\hat{x}_{u,ij})

\frac{\partial}{\partial \mathbf{q}_i} \ln \sigma(\hat{x}_{u,ij}) = (1 - \sigma(\hat{x}_{u,ij})) \cdot \frac{\partial \hat{x}_{u,ij}}{\partial \mathbf{q}_i}

因为:

\frac{\partial \hat{x}_{u,ij}}{\partial \mathbf{q}_i} = \frac{\partial}{\partial \mathbf{q}_i} (\mathbf{p}_u^T \mathbf{q}_i) = \mathbf{p}_u

所以:

\frac{\partial}{\partial \mathbf{q}_i} \ln \sigma(\hat{x}_{u,ij}) = (1 - \sigma(\hat{x}_{u,ij})) \cdot \mathbf{p}_u

再计算正则化项的梯度:

\frac{\partial}{\partial \mathbf{q}_i} (- \frac{\lambda}{2} \|\mathbf{q}_i\|^2) = - \lambda \mathbf{q}_i

因此,对正样本项目向量 \mathbf{q}_i 的总梯度为:

\frac{\partial L}{\partial \mathbf{q}_i} = (1 - \sigma(\hat{x}_{u,ij})) \cdot \mathbf{p}_u - \lambda \mathbf{q}_i

对负样本项目向量 \mathbf{q}_j 求梯度

        同理得到:

\frac{\partial}{\partial \mathbf{q}_j} = -(1 - \sigma(\hat{x}_{u,ij})) \cdot \mathbf{p}_u - \lambda \mathbf{q}_j

参数的更新公式

        更新用户向量 \mathbf{p}_u​:

\mathbf{p}_u \leftarrow \mathbf{p}_u + \eta \left( (1 - \sigma(\hat{x}_{u,ij})) \cdot (\mathbf{q}_i - \mathbf{q}_j) - \lambda \mathbf{p}_u \right)

        更新正样本项目向量 \mathbf{q}_i

\mathbf{q}_i \leftarrow \mathbf{q}_i + \eta \left( (1 - \sigma(\hat{x}_{u,ij})) \cdot \mathbf{p}_u - \lambda \mathbf{q}_i \right)

        更新负样本项目向量 \mathbf{q}_j

\mathbf{q}_j \leftarrow \mathbf{q}_j + \eta \left( -(1 - \sigma(\hat{x}_{u,ij})) \cdot \mathbf{p}_u - \lambda \mathbf{q}_j \right)

        其中,\eta 是学习率,控制参数更新的步长。

2.1.7 BPR算法代码示例

        早期我们会自己手写BPR算法代码来使用,不过看到目前已有相关开源软件cornac开发了BPR算法,因此主要以开源实现【5,6,7】的理解及回顾为主。

代码示例(cython)【5,6】,仅用于源码学习,请安装cornac包使用

import multiprocessingcimport cython
from cython cimport floating, integral
from cython.parallel import parallel, prange
from libc.math cimport exp
from libcpp cimport bool
from libcpp.algorithm cimport binary_searchimport numpy as np
cimport numpy as np
from tqdm.auto import trangefrom ..recommender import Recommender
from ..recommender import ANNMixin, MEASURE_DOT
from ...exception import ScoreException
from ...utils import get_rng
from ...utils import fast_dot
from ...utils.common import scale
from ...utils.init_utils import zeros, uniformcdef extern from "recom_bpr.h" namespace "recom_bpr" nogil:cdef int get_thread_num()@cython.boundscheck(False)
cdef bool has_non_zero(integral[:] indptr, integral[:] indices,integral rowid, integral colid) noexcept nogil:"""Given a CSR matrix, returns whether the [rowid, colid] contains a non zero.Assumes the CSR matrix has sorted indices"""return binary_search(&indices[indptr[rowid]], &indices[indptr[rowid + 1]], colid)cdef class RNGVector(object):def __init__(self, int num_threads, long rows, int seed):rng = get_rng(seed)for i in range(num_threads):self.rng.push_back(mt19937(rng.randint(2 ** 31)))self.dist.push_back(uniform_int_distribution[long](0, rows))cdef inline long generate(self, int thread_id) noexcept nogil:return self.dist[thread_id](self.rng[thread_id])class BPR(Recommender, ANNMixin):"""Bayesian Personalized Ranking.Parameters----------k: int, optional, default: 10The dimension of the latent factors.max_iter: int, optional, default: 100Maximum number of iterations or the number of epochs for SGD.learning_rate: float, optional, default: 0.001The learning rate for SGD.lambda_reg: float, optional, default: 0.001The regularization hyper-parameter.use_bias: boolean, optional, default: TrueWhen True, item bias is used.num_threads: int, optional, default: 0Number of parallel threads for training. If num_threads=0, all CPU cores will be utilized.If seed is not None, num_threads=1 to remove randomness from parallelization.trainable: boolean, optional, default: TrueWhen False, the model will not be re-trained, and input of pre-trained parameters are required.verbose: boolean, optional, default: TrueWhen True, some running logs are displayed.init_params: dictionary, optional, default: NoneInitial parameters, e.g., init_params = {'U': user_factors, 'V': item_factors, 'Bi': item_biases}seed: int, optional, default: NoneRandom seed for weight initialization.If specified, training will take longer because of single-thread (no parallelization).References----------* Rendle, Steffen, Christoph Freudenthaler, Zeno Gantner, and Lars Schmidt-Thieme. \BPR: Bayesian personalized ranking from implicit feedback. In UAI, pp. 452-461. 2009."""def __init__(self, name='BPR', k=10, max_iter=100, learning_rate=0.001, lambda_reg=0.01,use_bias=True,num_threads=0, trainable=True, verbose=False, init_params=None, seed=None):super().__init__(name=name, trainable=trainable, verbose=verbose)self.k = kself.max_iter = max_iterself.learning_rate = learning_rateself.lambda_reg = lambda_regself.use_bias = use_biasself.seed = seedself.rng = get_rng(seed)if seed is not None:self.num_threads = 1elif num_threads > 0 and num_threads < multiprocessing.cpu_count():self.num_threads = num_threadselse:self.num_threads = multiprocessing.cpu_count()# Init params if providedself.init_params = {} if init_params is None else init_paramsself.u_factors = self.init_params.get('U', None)self.i_factors = self.init_params.get('V', None)self.i_biases = self.init_params.get('Bi', None)def _init(self):n_users, n_items = self.total_users, self.total_itemsif self.u_factors is None:self.u_factors = (uniform((n_users, self.k), random_state=self.rng) - 0.5) / self.kif self.i_factors is None:self.i_factors = (uniform((n_items, self.k), random_state=self.rng) - 0.5) / self.kself.i_biases = zeros(n_items) if self.i_biases is None or self.use_bias is False else self.i_biasesdef _prepare_data(self, train_set):X = train_set.matrix # csr_matrix# this basically calculates the 'row' attribute of a COO matrix# without requiring us to get the whole COO matrixuser_counts = np.ediff1d(X.indptr)user_ids = np.repeat(np.arange(train_set.num_users), user_counts).astype(X.indices.dtype)return X, user_counts, user_idsdef fit(self, train_set, val_set=None):"""Fit the model to observations.Parameters----------train_set: :obj:`cornac.data.Dataset`, requiredUser-Item preference data as well as additional modalities.val_set: :obj:`cornac.data.Dataset`, optional, default: NoneUser-Item preference data for model selection purposes (e.g., early stopping).Returns-------self : object"""Recommender.fit(self, train_set, val_set)self._init()if not self.trainable:return selfX, user_counts, user_ids = self._prepare_data(train_set)neg_item_ids = np.arange(train_set.num_items, dtype=np.int32)cdef:int num_threads = self.num_threadsRNGVector rng_pos = RNGVector(num_threads, len(user_ids) - 1, self.rng.randint(2 ** 31))RNGVector rng_neg = RNGVector(num_threads, train_set.num_items - 1, self.rng.randint(2 ** 31))with trange(self.max_iter, disable=not self.verbose) as progress:for epoch in progress:correct, skipped = self._fit_sgd(rng_pos, rng_neg, num_threads,user_ids, X.indices, neg_item_ids, X.indptr,self.u_factors, self.i_factors, self.i_biases)progress.set_postfix({"correct": "%.2f%%" % (100.0 * correct / (len(user_ids) - skipped)),"skipped": "%.2f%%" % (100.0 * skipped / len(user_ids))})if self.verbose:print('Optimization finished!')return self@cython.cdivision(True)@cython.boundscheck(False)@cython.wraparound(False)def _fit_sgd(self, RNGVector rng_pos, RNGVector rng_neg, int num_threads,integral[:] user_ids, integral[:] item_ids, integral[:] neg_item_ids, integral[:] indptr,floating[:, :] U, floating[:, :] V, floating[:] B):"""Fit the model parameters (U, V, B) with SGD"""cdef:long num_samples = len(user_ids), s, i_index, j_index, correct = 0, skipped = 0long num_items = self.num_itemsintegral f, i_id, j_id, thread_idfloating z, score, tempbool use_bias = self.use_biasfloating lr = self.learning_ratefloating reg = self.lambda_regint factors = self.kfloating * userfloating * item_ifloating * item_jwith nogil, parallel(num_threads=num_threads):thread_id = get_thread_num()for s in prange(num_samples, schedule='guided'):i_index = rng_pos.generate(thread_id)i_id = item_ids[i_index]j_index = rng_neg.generate(thread_id)j_id = neg_item_ids[j_index]# if the user has liked the item j, skip this for nowif has_non_zero(indptr, item_ids, user_ids[i_index], j_id):skipped += 1continue# get pointers to the relevant factorsuser, item_i, item_j = &U[user_ids[i_index], 0], &V[i_id, 0], &V[j_id, 0]# compute the scorescore = B[i_id] - B[j_id]for f in range(factors):score = score + user[f] * (item_i[f] - item_j[f])z = 1.0 / (1.0 + exp(score))if z < .5:correct += 1# update the factors via sgd.for f in range(factors):temp = user[f]user[f] += lr * (z * (item_i[f] - item_j[f]) - reg * user[f])item_i[f] += lr * (z * temp - reg * item_i[f])item_j[f] += lr * (-z * temp - reg * item_j[f])# update item biasesif use_bias:B[i_id] += lr * (z - reg * B[i_id])B[j_id] += lr * (-z - reg * B[j_id])return correct, skippeddef score(self, user_idx, item_idx=None):"""Predict the scores/ratings of a user for an item.Parameters----------user_idx: int, requiredThe index of the user for whom to perform score prediction.item_idx: int, optional, default: NoneThe index of the item for which to perform score prediction.If None, scores for all known items will be returned.Returns-------res : A scalar or a Numpy arrayRelative scores that the user gives to the item or to all known items"""if item_idx is None:known_item_scores = np.copy(self.i_biases)fast_dot(self.u_factors[user_idx], self.i_factors, known_item_scores)return known_item_scoreselse:item_score = self.i_biases[item_idx]item_score += np.dot(self.u_factors[user_idx], self.i_factors[item_idx])return item_scoredef get_vector_measure(self):"""Getting a valid choice of vector measurement in ANNMixin._measures.Returns-------measure: MEASURE_DOTDot product aka. inner product"""return MEASURE_DOTdef get_user_vectors(self):"""Getting a matrix of user vectors serving as query for ANN search.Returns-------out: numpy.arrayMatrix of user vectors for all users available in the model. """user_vectors = np.concatenate((self.u_factors, np.ones([self.u_factors.shape[0], 1])), axis=1)return user_vectorsdef get_item_vectors(self):"""Getting a matrix of item vectors used for building the index for ANN search.Returns-------out: numpy.arrayMatrix of item vectors for all items available in the model. """item_vectors = np.concatenate((self.i_factors, self.i_biases.reshape((-1, 1))), axis=1)return item_vectors

使用示例【7】:

import os
import sys
import cornac
import pandas as pdfrom recommenders.datasets import movielens
from recommenders.datasets.python_splitters import python_random_split
from recommenders.evaluation.python_evaluation import map, ndcg_at_k, precision_at_k, recall_at_k
from recommenders.models.cornac.cornac_utils import predict_ranking
from recommenders.utils.timer import Timer
from recommenders.utils.constants import SEED
from recommenders.utils.notebook_utils import store_metadataprint(f"System version: {sys.version}")
print(f"Cornac version: {cornac.__version__}")# Select MovieLens data size: 100k, 1m, 10m, or 20m
MOVIELENS_DATA_SIZE = '100k'# top k items to recommend
TOP_K = 10# Model parameters
NUM_FACTORS = 200
NUM_EPOCHS = 100data = movielens.load_pandas_df(size=MOVIELENS_DATA_SIZE,header=["userID", "itemID", "rating"]
)data.head()train, test = python_random_split(data, 0.75)train_set = cornac.data.Dataset.from_uir(train.itertuples(index=False), seed=SEED)print('Number of users: {}'.format(train_set.num_users))
print('Number of items: {}'.format(train_set.num_items))bpr = cornac.models.BPR(k=NUM_FACTORS,max_iter=NUM_EPOCHS,learning_rate=0.01,lambda_reg=0.001,verbose=True,seed=SEED
)with Timer() as t:bpr.fit(train_set)
print("Took {} seconds for training.".format(t))with Timer() as t:all_predictions = predict_ranking(bpr, train, usercol='userID', itemcol='itemID', remove_seen=True)
print("Took {} seconds for prediction.".format(t))all_predictions.head()k = 10
eval_map = map(test, all_predictions, col_prediction='prediction', k=k)
eval_ndcg = ndcg_at_k(test, all_predictions, col_prediction='prediction', k=k)
eval_precision = precision_at_k(test, all_predictions, col_prediction='prediction', k=k)
eval_recall = recall_at_k(test, all_predictions, col_prediction='prediction', k=k)print("MAP:\t%f" % eval_map,"NDCG:\t%f" % eval_ndcg,"Precision@K:\t%f" % eval_precision,"Recall@K:\t%f" % eval_recall, sep='\n')# Record results for tests - ignore this cell
store_metadata("map", eval_map)
store_metadata("ndcg", eval_ndcg)
store_metadata("precision", eval_precision)
store_metadata("recall", eval_recall)

2.2 ALS 矩阵分解

2.2.1 算法背景

        之所以介绍基于ALS的矩阵分解,是因为在面对大规模数据的场景,往往需要处理海量的样本数据。我们曾在拥有大量文本资讯的平台中引入了spark作为基础设施,其中Spark MLlib 提供了一种协同过滤算法ALS,可以用于训练矩阵分解模型,该模型用于预测用户对物品的显性或隐性评分,从而实现推荐【8】。相对于BPR, ALS的矩阵分解,更容易理解。    

2.2.2 矩阵分解算法分析

        矩阵分解是一种在推荐任务中常用的技术。矩阵分解算法试图找到代表用户和物品内在属性的潜在因子,以降低维度。即:

\hat{r}_{ui} = p_u^T q_i

        其中,\hat{r}_{ui} 是用户 u 对物品 i 的预测评分,p_u​ 和 q_i 分别是用户和物品的潜在因子。矩阵分解问题的挑战在于找到合适的 p_u​ 和 q_i, 可以通过矩阵分解方法来实现。因此算法需要实现分解结果尽可能接近观测评分。在显性评分不可用的情况下,通常使用隐性评分,这些评分通常来自用户与物品的历史交互(例如点击、浏览、购买等)。r_{ui}是用户偏好的数值表示(例如点击次数等)。为了避免过拟合问题,学习过程需要进行正则化。矩阵分解算法的基本形式如下:

\min_{p_u, q_i} \sum_{(u, i) \in D} (r_{ui} - p_u^T q_i)^2 + \lambda (||p_u||^2 + ||q_i||^2)

2.2.3 基于交替最小二乘法 (ALS)的潜在因子参数更新

        由于损失函数中的正则项使其成为非凸问题,可以应用梯度下降方法,但这会导致较高的计算成本。因此,Spark MLlib采用交替最小二乘法 (ALS) 算法来解决。

        ALS 的基本思想是每次优化时,只学习 p_u​ 和 q_i 中的一个,而将另一个保持不变。这样使得每次迭代中的目标函数都是凸的且可解的。当 p_u 和 q_i 交替优化达到收敛时,算法停止。这种迭代计算可以并行化或者分布式处理,因此比较适合数据集非常大、用户-物品评分矩阵非常稀疏的情况。

 ALS 的交替优化过程详细解释:

假设我们有一个用户-物品评分矩阵 R,其中:

  • R_{ui}​ 是用户 u 对物品 i 的评分。
  • P 是用户的潜在因子矩阵,每行表示一个用户的潜在因子向量 p_u
  • Q 是物品的潜在因子矩阵,每行表示一个物品的潜在因子向量 q_i

        ALS 的目标是通过最小化以下目标函数来找到用户因子矩阵 P 和物品因子矩阵 Q:

\min_{P, Q} \sum_{(u, i) \in D} (R_{ui} - p_u^T q_i)^2 + \lambda \left( \sum_{u} ||p_u||^2 + \sum_{i} ||q_i||^2 \right)

其中:

  • R_{ui} 是已知的评分。
  • p_u​ 是用户 u 的潜在因子向量。
  • q_i 是物品 i 的潜在因子向量。
  • D 是用户-物品评分数据集的集合。
  • \lambda 是正则化参数,防止过拟合。

交替优化步骤

        由于目标函数是非凸(双线性形式的乘积导致的非凸性)的,同时优化 P 和 Q 会比较困难。因此,ALS 使用以下交替优化步骤:

        固定物品因子矩阵 Q,优化用户因子矩阵 P:

        当物品因子矩阵 Q 固定时,优化用户因子矩阵 P。对于每个用户 u,目标是找到一个最优的 p_u​ 来最小化以下子问题:

p_u = \arg\min_{p_u} \sum_{i: (u, i) \in D} (R_{ui} - p_u^T q_i)^2 + \lambda ||p_u||^2

        这个问题是一个标准的线性最小二乘问题,因为对 p_u 而言,它的目标函数是凸的。通过求解线性方程组得到用户因子矩阵的解。

        固定用户因子矩阵 P,优化物品因子矩阵 Q:

        类似地,当用户因子矩阵 P 固定时,优化物品因子矩阵 Q。对于每个物品 i,目标是找到一个最优的 q_i 来最小化以下子问题:

q_i = \arg\min_{q_i} \sum_{u: (u, i) \in D} (R_{ui} - p_u^T q_i)^2 + \lambda ||q_i||^2

        这个问题也是一个线性最小二乘问题,同样可以通过求解线性方程组得到物品因子矩阵的解。

交替更新

        ALS 通过交替地执行上述两个步骤,不断更新 P 和 Q,直到收敛(即两者变化很小或者达到预设的迭代次数)。具体来说,每次迭代会:

  1. 固定 Q,更新 P:计算所有用户的最优潜在因子向量 p_u​。
  2. 固定 P,更新 Q:计算所有物品的最优潜在因子向量 q_i

交替优化数学求解过程:

        (1) 固定 Q,优化 P:

        对于给定的用户 u,我们最小化以下目标函数:

\min_{p_u} \sum_{i \in I_u} (R_{ui} - p_u^T q_i)^2 + \lambda ||p_u||^2

        其中,I_u 是用户 u 评分的物品集合。展开后,可以得到:

       \min_{p_u} \left( p_u^T \left( \sum_{i \in I_u} q_i q_i^T \right) p_u - 2 \sum_{i \in I_u} R_{ui} (q_i^T p_u) + \lambda ||p_u||^2 \right)

        令导数为 0,得到:

\left( \sum_{i \in I_u} q_i q_i^T + \lambda I \right) p_u = \sum_{i \in I_u} R_{ui}q_i

        这是一个线性方程组,可以解得 p_u

        (2) 固定 P,优化 Q:

        对于给定的物品 i,最小化以下目标函数:

\min_{q_i} \sum_{u \in U_i} (R_{ui} - p_u^T q_i)^2 + \lambda ||q_i||^2

        其中,U_i 是给定物品 i 的评分用户集合。类似地,导数为零时得到:

\left( \sum_{u \in U_i} p_u p_u^T + \lambda I \right) q_i = \sum_{u \in U_i} R_{ui} p_u

        这同样是一个线性方程组,可以解得 q_i。        

2.2.4 Spark MLlib 实现

        矩阵分解算法在 Spark ML 中作为 ALS 模块提供给 DataFrame,在 Spark MLlib 中提供给 RDD。ALS通过使用“交替最小二乘法”方法来分布矩阵分解模型的训练【8】。

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)import sys
from matplotlib import pyplot as plt
import numpy as np
import pandas as pd
import seaborn as snsimport pyspark
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.ml.tuning import CrossValidator
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import FloatType, IntegerType, LongTypefrom recommenders.datasets import movielens
from recommenders.utils.spark_utils import start_or_get_spark
from recommenders.evaluation.spark_evaluation import SparkRankingEvaluation, SparkRatingEvaluation
from recommenders.tuning.parameter_sweep import generate_param_grid
from recommenders.datasets.spark_splitters import spark_random_splitprint(f"System version: {sys.version}")
print(f"Pandas version: {pd.__version__}")
print(f"PySpark version: {pyspark.__version__}")MOVIELENS_DATA_SIZE = "100k"COL_USER = "UserId"
COL_ITEM = "MovieId"
COL_RATING = "Rating"
COL_PREDICTION = "prediction"
COL_TIMESTAMP = "Timestamp"schema = StructType((StructField(COL_USER, IntegerType()),StructField(COL_ITEM, IntegerType()),StructField(COL_RATING, FloatType()),StructField(COL_TIMESTAMP, LongType()),)
)RANK = 10
MAX_ITER = 15
REG_PARAM = 0.05
K = 10spark = start_or_get_spark("ALS", memory="16g")
spark.conf.set("spark.sql.analyzer.failAmbiguousSelfJoin", "false")dfs = movielens.load_spark_df(spark=spark, size=MOVIELENS_DATA_SIZE, schema=schema)dfs_train, dfs_test = spark_random_split(dfs, ratio=0.75, seed=42)als = ALS(maxIter=MAX_ITER, rank=RANK,regParam=REG_PARAM, userCol=COL_USER, itemCol=COL_ITEM, ratingCol=COL_RATING, coldStartStrategy="drop"
)model = als.fit(dfs_train)dfs_pred = model.transform(dfs_test).drop(COL_RATING)evaluations = SparkRatingEvaluation(dfs_test, dfs_pred,col_user=COL_USER,col_item=COL_ITEM,col_rating=COL_RATING,col_prediction=COL_PREDICTION
)print("RMSE score = {}".format(evaluations.rmse()),"MAE score = {}".format(evaluations.mae()),"R2 score = {}".format(evaluations.rsquared()),"Explained variance score = {}".format(evaluations.exp_var()),sep="\n"
)# Get the cross join of all user-item pairs and score them.
users = dfs_train.select(COL_USER).distinct()
items = dfs_train.select(COL_ITEM).distinct()
user_item = users.crossJoin(items)
dfs_pred = model.transform(user_item)# Remove seen items.
dfs_pred_exclude_train = dfs_pred.alias("pred").join(dfs_train.alias("train"),(dfs_pred[COL_USER] == dfs_train[COL_USER]) & (dfs_pred[COL_ITEM] == dfs_train[COL_ITEM]),how='outer'
)dfs_pred_final = dfs_pred_exclude_train.filter(dfs_pred_exclude_train["train.Rating"].isNull()) \.select('pred.' + COL_USER, 'pred.' + COL_ITEM, 'pred.' + "prediction")evaluations = SparkRankingEvaluation(dfs_test, dfs_pred_final,col_user=COL_USER,col_item=COL_ITEM,col_rating=COL_RATING,col_prediction=COL_PREDICTION,k=K
)print("Precision@k = {}".format(evaluations.precision_at_k()),"Recall@k = {}".format(evaluations.recall_at_k()),"NDCG@k = {}".format(evaluations.ndcg_at_k()),"Mean average precision = {}".format(evaluations.map_at_k()),sep="\n"
)

2.3 Wide & Deep Model

2.3.1 算法背景

        Wide & Deep Learning 【9】是一种结合了“广度模型”(Wide Model)和“深度模型”(Deep Model)的推荐系统架构,预期是能同时捕获记忆(memorization)和泛化(generalization)的能力,以提升推荐系统的效果。        

        推荐系统通常面临两个挑战:

  1. 记忆(Memorization):利用历史数据捕捉特征之间的已知关系。例如,“用户 A 过去购买了产品 B,因此推荐类似的产品”。
  2. 泛化(Generalization):通过捕捉特征之间的复杂关系来进行推理。例如,即使用户从未购买过某产品,系统也可以通过其他相似用户的行为来进行推荐

        “广度模型”擅长记忆,而“深度模型”擅长泛化。Wide & Deep Learning 将这两种模型结合起来,从而同时拥有记忆和泛化的能力。 

2.3.2 Wide & Deep Learning 结构

        Wide & Deep Learning 模型的结构通常由两部分组成:

  1. Wide 部分:

    • 这是一个基于线性模型的部分。它使用特征和特征交叉(feature crosses)来学习特定的记忆规则。
    • 在公式中,Wide 部分通常是一个线性模型: y_{\text{wide}} = w^T x + b
    • 其中,x 是输入特征,w 是权重向量,b 是偏置。
  2. Deep 部分

    • 这是一个基于深度神经网络的部分。它使用多层感知器(MLP),可以捕捉输入特征的高阶特征交互。
    • 深度模型的输出为:y_{\text{deep}} = f_{\text{DNN}}(x)
    • 其中,f_{\text{DNN}}​ 表示深度神经网络的非线性函数。
  3. 组合部分

    • 最后的输出是 Wide 部分和 Deep 部分输出的组合,一般表示为: y = \sigma(y_{\text{wide}} + y_{\text{deep}})
    • 其中,\sigma 是一个激活函数(如 sigmoid 或 softmax),用于将输出转换为概率。

2.3.3 Wide and Deep model的数学推导

        W&D模型本身是一个比较简单的模型,结构清晰容易理解。这里做下简单介绍数学部分内容。

        Wide 部分是一个线性模型,目标是直接对输入特征进行加权求和。假设输入特征为 x,权重为 w,偏置项为 b,则 Wide 部分的输出为:

y_{\text{wide}} = w^T x + b

        其中:

  • x 是输入特征向量,可以是稀疏的特征,如类别特征的 One-Hot 编码。
  • w 是与输入特征相对应的权重向量。
  • b 是偏置项。

        Wide 部分通常用于记忆一些固定的特征交互(如历史特征、规则特征等)。

        Deep 部分是一个多层的神经网络,用于从输入数据中学习复杂的非线性特征关系。

        假设输入特征为 x,网络有 L 层,每一层的权重为 W^{(l)} 和偏置为 b^{(l)}(其中 l = 1, 2, \dots, L)。Deep 部分的输出可以表示为:

a^{(1)} = f(W^{(1)} x + b^{(1)})

a^{(2)} = f(W^{(2)} a^{(1)} + b^{(2)})

\vdots

a^{(L)} = f(W^{(L)} a^{(L-1)} + b^{(L)})

        最终,Deep 部分的输出为 y_{\text{deep}} = a^{(L)},其中f(\cdot) 是激活函数(如 ReLU, Sigmoid)。

        Wide 部分和 Deep 部分的输出可以被组合在一起形成最终的预测。假设组合方法为线性加权求和,则最终的模型输出为:

\hat{y} = \sigma(y_{\text{wide}} + y_{\text{deep}})

其中:

  • \hat{y} 是最终的预测值。
  • \sigma(\cdot) 是输出层的激活函数,通常是 Sigmoid(用于二分类任务)或 Softmax(用于多分类任务)。

        对于分类任务,通常使用交叉熵损失函数:

\text{Loss} = - \frac{1}{N} \sum_{i=1}^{N} \left[ y_i \log(\hat{y}_i) + (1 - y_i) \log(1 - \hat{y}_i) \right]

        其中:

  • y_i 是第 i 个样本的真实标签。
  • \hat{y}_i 是模型的预测输出。

        Wide & Deep 模型的训练是通过最小化损失函数来更新模型参数。通常使用反向传播算法和随机梯度下降(SGD)或其变种(如 Adam, RMSProp)来进行训练。

        训练过程如下:

  1. 前向传播: 计算 Wide 部分和 Deep 部分的输出,并将其组合得到最终预测值。
  2. 计算损失: 使用预测值和真实标签计算损失函数。
  3. 反向传播: 计算损失函数相对于模型参数的梯度。
  4. 参数更新: 根据梯度更新参数。

Wide 部分的梯度更新公式

        假设 Wide 部分的输入为特征向量 x,权重向量为 w,偏置为 b。Wide 部分的输出为:

y_{\text{wide}} = w^T x + b

        对损失函数\text{Loss}(例如交叉熵损失)求导来更新权重 w 和偏置 b。

        损失函数相对于权重 w 的梯度为:

\frac{\partial \text{Loss}}{\partial w} = \frac{\partial \text{Loss}}{\partial \hat{y}} \cdot \frac{\partial \hat{y}}{\partial y_{\text{wide}}} \cdot \frac{\partial y_{\text{wide}}}{\partial w}

        因为:

\frac{\partial y_{\text{wide}}}{\partial w} = x

        所以:

\frac{\partial \text{Loss}}{\partial w} = \delta \cdot x

        其中,\delta = \frac{\partial \text{Loss}}{\partial \hat{y}} \cdot \frac{\partial \hat{y}}{\partial y_{\text{wide}}} 是损失函数相对于 Wide 部分输出的梯度。

        损失函数相对于偏置 b 的梯度为:

\frac{\partial \text{Loss}}{\partial b} = \delta

Deep 部分的梯度更新公式

        Deep 部分是一个多层的神经网络,包含多层权重和偏置。假设第 l 层的权重为 W^{(l)},偏置为 b^{(l)},输入为 a^{(l-1)},输出为 a^{(l)}(通过激活函数f(\cdot) 得到)。则第 l 层的输出为:

z^{(l)} = W^{(l)} a^{(l-1)} + b^{(l)}, \quad a^{(l)} = f(z^{(l)})

        损失函数相对于权重W^{(l)} 的梯度为:

\frac{\partial \text{Loss}}{\partial W^{(l)}} = \delta^{(l)} \cdot (a^{(l-1)})^T

        其中,\delta^{(l)} = \frac{\partial \text{Loss}}{\partial z^{(l)}} = \frac{\partial \text{Loss}}{\partial a^{(l)}} \cdot f'(z^{(l)})是第 l 层的误差项,f'(z^{(l)})是激活函数的导数。

        对第 l 层偏置 b^{(l)}的梯度

        损失函数相对于偏置 b^{(l)} 的梯度为:

\frac{\partial \text{Loss}}{\partial b^{(l)}} = \delta^{(l)}

参数更新

        使用梯度下降法(例如 SGD 或 Adam 等优化方法)更新模型的参数。

        对 Wide 部分的权重和偏置更新:

w \leftarrow w - \eta \cdot \frac{\partial \text{Loss}}{\partial w}

b \leftarrow b - \eta \cdot \frac{\partial \text{Loss}}{\partial b}

        对 Deep 部分的权重和偏置更新:

W^{(l)} \leftarrow W^{(l)} - \eta \cdot \frac{\partial \text{Loss}}{\partial W^{(l)}}

b^{(l)} \leftarrow b^{(l)} - \eta \cdot \frac{\partial \text{Loss}}{\partial b^{(l)}}

2.3.4 应用示例

2.3.4.1 原论文应用示例结构

2.3.4.2 电影推荐示例

        【10,11, 12】基于movielens数据,给出了电影打分推荐的代码示例。这里主要展示wide&deep的模型构建代码(tf版本以及torch版本)(仅供学习参考)。

TF版本

import tensorflow as tffrom recommenders.utils.constants import DEFAULT_USER_COL, DEFAULT_ITEM_COL
from recommenders.utils.tf_utils import MODEL_DIRdef build_feature_columns(users,items,user_col=DEFAULT_USER_COL,item_col=DEFAULT_ITEM_COL,item_feat_col=None,crossed_feat_dim=1000,user_dim=8,item_dim=8,item_feat_shape=None,model_type="wide_deep",
):"""Build wide and/or deep feature columns for TensorFlow high-level API Estimator.Args:users (iterable): Distinct user ids.items (iterable): Distinct item ids.user_col (str): User column name.item_col (str): Item column name.item_feat_col (str): Item feature column name for 'deep' or 'wide_deep' model.crossed_feat_dim (int): Crossed feature dimension for 'wide' or 'wide_deep' model.user_dim (int): User embedding dimension for 'deep' or 'wide_deep' model.item_dim (int): Item embedding dimension for 'deep' or 'wide_deep' model.item_feat_shape (int or an iterable of integers): Item feature array shape for 'deep' or 'wide_deep' model.model_type (str): Model type, either'wide' for a linear model,'deep' for a deep neural networks, or'wide_deep' for a combination of linear model and neural networks.Returns:list, list:- The wide feature columns- The deep feature columns. If only the wide model is selected, the deep column list is empty and viceversa."""if model_type not in ["wide", "deep", "wide_deep"]:raise ValueError("Model type should be either 'wide', 'deep', or 'wide_deep'")user_ids = tf.feature_column.categorical_column_with_vocabulary_list(user_col, users)item_ids = tf.feature_column.categorical_column_with_vocabulary_list(item_col, items)if model_type == "wide":return _build_wide_columns(user_ids, item_ids, crossed_feat_dim), []elif model_type == "deep":return ([],_build_deep_columns(user_ids, item_ids, user_dim, item_dim, item_feat_col, item_feat_shape),)elif model_type == "wide_deep":return (_build_wide_columns(user_ids, item_ids, crossed_feat_dim),_build_deep_columns(user_ids, item_ids, user_dim, item_dim, item_feat_col, item_feat_shape),)def _build_wide_columns(user_ids, item_ids, hash_bucket_size=1000):"""Build wide feature (crossed) columns. `user_ids` * `item_ids` are hashed into `hash_bucket_size`Args:user_ids (tf.feature_column.categorical_column_with_vocabulary_list): User ids.item_ids (tf.feature_column.categorical_column_with_vocabulary_list): Item ids.hash_bucket_size (int): Hash bucket size.Returns:list: Wide feature columns."""# Including the original features in addition to the crossed one is recommended to address hash collision problem.return [user_ids,item_ids,tf.feature_column.crossed_column([user_ids, item_ids], hash_bucket_size=hash_bucket_size),]def _build_deep_columns(user_ids, item_ids, user_dim, item_dim, item_feat_col=None, item_feat_shape=1
):"""Build deep feature columnsArgs:user_ids (tf.feature_column.categorical_column_with_vocabulary_list): User ids.item_ids (tf.feature_column.categorical_column_with_vocabulary_list): Item ids.user_dim (int): User embedding dimension.item_dim (int): Item embedding dimension.item_feat_col (str): Item feature column name.item_feat_shape (int or an iterable of integers): Item feature array shape.Returns:list: Deep feature columns."""deep_columns = [# User embeddingtf.feature_column.embedding_column(categorical_column=user_ids, dimension=user_dim, max_norm=user_dim**0.5),# Item embeddingtf.feature_column.embedding_column(categorical_column=item_ids, dimension=item_dim, max_norm=item_dim**0.5),]# Item featureif item_feat_col is not None:deep_columns.append(tf.feature_column.numeric_column(item_feat_col, shape=item_feat_shape, dtype=tf.float32))return deep_columnsdef build_model(model_dir=MODEL_DIR,wide_columns=(),deep_columns=(),linear_optimizer="Ftrl",dnn_optimizer="Adagrad",dnn_hidden_units=(128, 128),dnn_dropout=0.0,dnn_batch_norm=True,log_every_n_iter=1000,save_checkpoints_steps=10000,seed=None,
):"""Build wide-deep model.To generate wide model, pass wide_columns only.To generate deep model, pass deep_columns only.To generate wide_deep model, pass both wide_columns and deep_columns.Args:model_dir (str): Model checkpoint directory.wide_columns (list of tf.feature_column): Wide model feature columns.deep_columns (list of tf.feature_column): Deep model feature columns.linear_optimizer (str or tf.train.Optimizer): Wide model optimizer name or object.dnn_optimizer (str or tf.train.Optimizer): Deep model optimizer name or object.dnn_hidden_units (list of int): Deep model hidden units. E.g., [10, 10, 10] is three layers of 10 nodes each.dnn_dropout (float): Deep model's dropout rate.dnn_batch_norm (bool): Deep model's batch normalization flag.log_every_n_iter (int): Log the training loss for every n steps.save_checkpoints_steps (int): Model checkpoint frequency.seed (int): Random seed.Returns:tf.estimator.Estimator: Model"""gpu_config = tf.compat.v1.ConfigProto()gpu_config.gpu_options.allow_growth = True  # dynamic memory allocation# TensorFlow training setupconfig = tf.estimator.RunConfig(tf_random_seed=seed,log_step_count_steps=log_every_n_iter,save_checkpoints_steps=save_checkpoints_steps,session_config=gpu_config,)if len(wide_columns) > 0 and len(deep_columns) == 0:model = tf.compat.v1.estimator.LinearRegressor(model_dir=model_dir,config=config,feature_columns=wide_columns,optimizer=linear_optimizer,)elif len(wide_columns) == 0 and len(deep_columns) > 0:model = tf.compat.v1.estimator.DNNRegressor(model_dir=model_dir,config=config,feature_columns=deep_columns,hidden_units=dnn_hidden_units,optimizer=dnn_optimizer,dropout=dnn_dropout,batch_norm=dnn_batch_norm,)elif len(wide_columns) > 0 and len(deep_columns) > 0:model = tf.compat.v1.estimator.DNNLinearCombinedRegressor(model_dir=model_dir,config=config,# wide settingslinear_feature_columns=wide_columns,linear_optimizer=linear_optimizer,# deep settingsdnn_feature_columns=deep_columns,dnn_hidden_units=dnn_hidden_units,dnn_optimizer=dnn_optimizer,dnn_dropout=dnn_dropout,batch_norm=dnn_batch_norm,)else:raise ValueError("To generate wide model, set wide_columns.\n""To generate deep model, set deep_columns.\n""To generate wide_deep model, set both wide_columns and deep_columns.")return model

torch版本

# -*- coding: utf-8 -*-
import numpy as np
import torch
import torch.autograd as autograd
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoaderuse_cuda = torch.cuda.is_available()class WideDeepLoader(Dataset):"""Helper to facilitate loading the data to the pytorch models.Parameters:--------data: namedtuple with 3 elements - (wide_input_data, deep_inp_data, target)"""def __init__(self, data):self.X_wide = data.wideself.X_deep = data.deepself.Y = data.labelsdef __getitem__(self, idx):xw = self.X_wide[idx]xd = self.X_deep[idx]y  = self.Y[idx]return xw, xd, ydef __len__(self):return len(self.Y)class WideDeep(nn.Module):""" Wide and Deep model. As explained in Heng-Tze Cheng et al., 2016, themodel taked the wide features and the deep features after being passed throughthe hidden layers and connects them to an output neuron. For details, pleaserefer to the paper and the corresponding tutorial in the tensorflow site:https://www.tensorflow.org/tutorials/wide_and_deepParameters:--------wide_dim (int) : dim of the wide-side input tensorembeddings_input (tuple): 3-elements tuple with the embeddings "set-up" -(col_name, unique_values, embeddings dim)continuous_cols (list) : list with the name of the continuum columnsdeep_column_idx (dict) : dictionary where the keys are column names and the valuestheir corresponding index in the deep-side input tensorhidden_layers (list) : list with the number of units per hidden layerencoding_dict (dict) : dictionary with the label-encode mappingn_class (int) : number of classes. Defaults to 1 if logistic or regressiondropout (float)"""def __init__(self,wide_dim,embeddings_input,continuous_cols,deep_column_idx,hidden_layers,dropout,encoding_dict,n_class):super(WideDeep, self).__init__()self.wide_dim = wide_dimself.deep_column_idx = deep_column_idxself.embeddings_input = embeddings_inputself.continuous_cols = continuous_colsself.hidden_layers = hidden_layersself.dropout = dropoutself.encoding_dict = encoding_dictself.n_class = n_class# Build the embedding layers to be passed through the deep-sidefor col,val,dim in self.embeddings_input:setattr(self, 'emb_layer_'+col, nn.Embedding(val, dim))# Build the deep-side hidden layers with dropout if specifiedinput_emb_dim = np.sum([emb[2] for emb in self.embeddings_input])self.linear_1 = nn.Linear(input_emb_dim+len(continuous_cols), self.hidden_layers[0])if self.dropout:self.linear_1_drop = nn.Dropout(self.dropout[0])for i,h in enumerate(self.hidden_layers[1:],1):setattr(self, 'linear_'+str(i+1), nn.Linear( self.hidden_layers[i-1], self.hidden_layers[i] ))if self.dropout:setattr(self, 'linear_'+str(i+1)+'_drop', nn.Dropout(self.dropout[i]))# Connect the wide- and dee-side of the model to the output neuron(s)self.output = nn.Linear(self.hidden_layers[-1]+self.wide_dim, self.n_class)def compile(self, method="logistic", optimizer="Adam", learning_rate=0.001, momentum=0.0):"""Wrapper to set the activation, loss and the optimizer.Parameters:----------method (str) : regression, logistic or multiclassoptimizer (str): SGD, Adam, or RMSprop"""if method == 'regression':self.activation, self.criterion = None, F.mse_lossif method == 'logistic':self.activation, self.criterion = F.sigmoid, F.binary_cross_entropyif method == 'multiclass':self.activation, self.criterion = F.softmax, F.cross_entropyif optimizer == "Adam":self.optimizer = torch.optim.Adam(self.parameters(), lr=learning_rate)if optimizer == "RMSprop":self.optimizer = torch.optim.RMSprop(self.parameters(), lr=learning_rate)if optimizer == "SGD":self.optimizer = torch.optim.SGD(self.parameters(), lr=learning_rate, momentum=momentum)self.method = methoddef forward(self, X_w, X_d):"""Implementation of the forward pass.Parameters:----------X_w (torch.tensor) : wide-side input tensorX_d (torch.tensor) : deep-side input tensorReturns:--------out (torch.tensor) : result of the output neuron(s)"""# Deep Sideemb = [getattr(self, 'emb_layer_'+col)(X_d[:,self.deep_column_idx[col]].long())for col,_,_ in self.embeddings_input]if self.continuous_cols:cont_idx = [self.deep_column_idx[col] for col in self.continuous_cols]cont = [X_d[:, cont_idx].float()]deep_inp = torch.cat(emb+cont, 1)else:deep_inp = torch.cat(emb, 1)x_deep = F.relu(self.linear_1(deep_inp))if self.dropout:x_deep = self.linear_1_drop(x_deep)for i in range(1,len(self.hidden_layers)):x_deep = F.relu( getattr(self, 'linear_'+str(i+1))(x_deep) )if self.dropout:x_deep = getattr(self, 'linear_'+str(i+1)+'_drop')(x_deep)# Deep + Wide sideswide_deep_input = torch.cat([x_deep, X_w.float()], 1)if not self.activation:out = self.output(wide_deep_input)else:out = self.activation(self.output(wide_deep_input))return outdef fit(self, dataset, n_epochs, batch_size):"""Run the model for the training set at dataset.Parameters:----------dataset (dict): dictionary with the training sets -X_wide_train, X_deep_train, targetn_epochs (int)batch_size (int)"""widedeep_dataset = WideDeepLoader(dataset)train_loader = torch.utils.data.DataLoader(dataset=widedeep_dataset,batch_size=batch_size,shuffle=True)# set the model in training modenet = self.train()for epoch in range(n_epochs):total=0correct=0for i, (X_wide, X_deep, target) in enumerate(train_loader):X_w = Variable(X_wide)X_d = Variable(X_deep)y = (Variable(target).float() if self.method != 'multiclass' else Variable(target))if use_cuda:X_w, X_d, y = X_w.cuda(), X_d.cuda(), y.cuda()self.optimizer.zero_grad()y_pred =  net(X_w, X_d)loss = self.criterion(y_pred, y)loss.backward()self.optimizer.step()if self.method != "regression":total+= y.size(0)if self.method == 'logistic':y_pred_cat = (y_pred > 0.5).squeeze(1).float()if self.method == "multiclass":_, y_pred_cat = torch.max(y_pred, 1)correct+= float((y_pred_cat == y).sum().data[0])if self.method != "regression":print ('Epoch {} of {}, Loss: {}, accuracy: {}'.format(epoch+1,n_epochs, round(loss.data[0],3), round(correct/total,4)))else:print ('Epoch {} of {}, Loss: {}'.format(epoch+1, n_epochs,round(loss.data[0],3)))def predict(self, dataset):"""Predict target for dataset.Parameters:----------dataset (dict): dictionary with the testing dataset -X_wide_test, X_deep_test, targetReturns:--------array-like with the target for dataset"""X_w = Variable(torch.from_numpy(dataset.wide)).float()X_d = Variable(torch.from_numpy(dataset.deep))if use_cuda:X_w, X_d = X_w.cuda(), X_d.cuda()# set the model in evaluation mode so dropout is not appliednet = self.eval()pred = net(X_w,X_d).cpu()if self.method == "regression":return pred.squeeze(1).data.numpy()if self.method == "logistic":return (pred > 0.5).squeeze(1).data.numpy()if self.method == "multiclass":_, pred_cat = torch.max(pred, 1)return pred_cat.data.numpy()def predict_proba(self, dataset):"""Predict predict probability for dataset.This method will only work with method logistic/multiclassParameters:----------dataset (dict): dictionary with the testing dataset -X_wide_test, X_deep_test, targetReturns:--------array-like with the probability for dataset."""X_w = Variable(torch.from_numpy(dataset.wide)).float()X_d = Variable(torch.from_numpy(dataset.deep))if use_cuda:X_w, X_d = X_w.cuda(), X_d.cuda()# set the model in evaluation mode so dropout is not appliednet = self.eval()pred = net(X_w,X_d).cpu()if self.method == "logistic":pred = pred.squeeze(1).data.numpy()probs = np.zeros([pred.shape[0],2])probs[:,0] = 1-predprobs[:,1] = predreturn probsif self.method == "multiclass":return pred.data.numpy()def get_embeddings(self, col_name):"""Extract the embeddings for the embedding columns.Parameters:-----------col_name (str) : column we want the embedding forReturns:--------embeddings_dict (dict): dictionary with the column values and the embeddings"""params = list(self.named_parameters())emb_layers = [p for p in params if 'emb_layer' in p[0]]emb_layer  = [layer for layer in emb_layers if col_name in layer[0]][0]embeddings = emb_layer[1].cpu().data.numpy()col_label_encoding = self.encoding_dict[col_name]inv_dict = {v:k for k,v in col_label_encoding.iteritems()}embeddings_dict = {}for idx,value in inv_dict.iteritems():embeddings_dict[value] = embeddings[idx]return embeddings_dict

3. 参考材料

【1】从大数据到大模型搜索推荐技术的前沿探索

【2】深度学习在美团推荐平台排序中的运用

【3】LLM4RecSys

【4】BPR: Bayesian Personalized Ranking from Implicit Feedback

【5】cornac/cornac/models/bpr at master · PreferredAI/cornac · GitHub

【6】A Comparative Framework for Multimodal Recommender Systems.https://cornac.preferred.ai/

【7】Best Practices on Recommendation Systems

【8】als_deep_dive

【9】Wide & Deep Learning for Recommender Systems

【10】wide-deep-learning-for-recsys-with-pytorch

【11】wide_deep_utils

【12】Wide-and-Deep-PyTorch


http://www.ppmy.cn/news/1524080.html

相关文章

剪画:分享一款自媒体新手小白都在用的剪辑工具,收藏!

哈喽&#xff0c;大家好呀&#xff01; 自媒体新手小白们看过来&#xff01;今天要给大家介绍一款简单实用的视频剪辑工具 —— 剪画。 对于刚刚踏上自媒体之路的你来说&#xff0c;视频去水印功能简直是救星。当你找到一些精彩的素材&#xff0c;却被水印困扰时&#xff0c;剪…

理解Python中的生成器表达式(Generator Expression)

Python中的生成器表达式&#xff08;Generator Expression&#xff09;是一种简洁而强大的语法结构&#xff0c;它允许你以一种类似于列表推导式&#xff08;List Comprehension&#xff09;的方式快速生成迭代器&#xff0c;但相比之下&#xff0c;生成器表达式在内存使用上更…

Electron32-Vue3OS桌面管理os模板|vite5+electron32+arco后台os系统

原创新作electron32.xvue3arco.design仿ipad/windows桌面os系统。 基于最新跨平台技术Electron32、Vite5、Vue3 setup、Pinia2、Arco-Design、Echarts、Sortablejs实战开发桌面版osx管理系统。内置ipad/windows两种桌面风格模板、动态json配置桌面图标、自研栅格拖拽布局模板。…

[Python学习篇] Python搭建静态web服务器

Python内置的web静态服务器 Python内置的http.server模块可以快速启动一个简单的HTTP服务器。 在Python 3中&#xff0c;打开命令行或终端&#xff0c;在你想要作为静态服务器根目录的文件夹下&#xff0c;运行以下命令&#xff1a; python -m http.server 8000 这将会在…

来也PRA机器人程序代码语法

来也的代码&#xff0c;有点像VB&#xff0c;但又可以用“//”注释&#xff0c;用“.”调用子函数&#xff0c;还是很方便&#xff0c;也不分大小写&#xff0c;越简单越好。可惜官方的命令例子太敷衍&#xff0c;都没有实例数据说明。参数的值也没有明确说明。哎&#xff0c;还…

Flutter 中的低功耗蓝牙概述

随着智能设备数量的增加&#xff0c;控制这些设备的需求也在增加。对于多种使用情况&#xff0c;期望设备在需要进行控制的同时连接到互联网会受到很大限制&#xff0c;因此是不可行的。在这些情况下&#xff0c;使用低功耗蓝牙&#xff08;也称为 Bluetooth LE 或 BLE&#xf…

【数据结构】Lambda表达式的应用

前言&#xff1a; &#x1f31f;&#x1f31f;本期讲解关于Lambda表达式&#xff0c;希望能帮到屏幕前的你。 &#x1f308;上期博客在这里&#xff1a;http://t.csdnimg.cn/o8CCD &#x1f308;感兴趣的小伙伴看一看小编主页&#xff1a;GGBondlctrl-CSDN博客 目录 &#x1f…

DBeaver连接数据库报连接错误:Public Key Retrieval is not allowed

问题描述 使用DBeaver软件连接mysql数据库的时候&#xff0c;有如下提示信息&#xff1a; 解决办法 点击驱动属性->找到allowPublicKeyRetrieval这项&#xff0c;把值设置为TRUE,再点击连接测试 可以成功连接到mysql数据库&#xff0c;问题解决~

MySQL——数据库的高级操作(一)数据备份与还原(1)数据的备份

在操作数据库时&#xff0c;难免会发生一些意外造成数据丢失。例如&#xff0c;突然停电、管理员的操作失误都可能导致数据的丢失。为了确保数据的安全&#xff0c;需要定期对数据库进行备份&#xff0c;这样&#xff0c;当遇到数据库中数据丢失或者出错的情况&#xff0c;就可…

llvm后端之局部变量

llvm后端之局部变量 引言1 生成FrameIndexSDNode2 消除FrameIndex2.1 eliminateCallFramePseudoInstr2.2 eliminateFrameIndex 引言 llvm后端对局部变量(即alloc节点)的访问&#xff0c;首先&#xff0c;将对alloc节点转为FrameIndex&#xff0c;所有对alloc的load和store均用…

Pycharm配置ssh远程服务器解析器

算法学习、4对1辅导、论文辅导或核心期刊可以通过公众号滴滴我 文章目录 需求配置流程 需求 之前在开发中&#xff0c;Pycharm都是通过本机Python环境来解析。但有时候&#xff0c;可能受限于本机电脑配置原因&#xff0c;导致运行速度并不快。因此推荐大家尝试下&#xff0c…

深入解析Go语言中的条件控制与数据处理

Go语言中的switch语句 switch语句在Go语言中有着非常重要的作用&#xff0c;因为它不仅可以用于常见的条件判断&#xff0c;还可以结合正则表达式进行更复杂的字符串匹配。下面我们首先来看一个简单的switch语句示例&#xff1a; switch asString { case "1":fmt.P…

MATLAB实现PID参数自动整定

目录 1、项目说明 2、文件说明 1、项目说明 本项目旨在通过 MATLAB 语言实现 PID 参数的自动整定&#xff0c;并设计了一个直观易用的 GUI 界面。该系统特别适用于实验室环境下的 PID 参数自整定任务。整定的核心原则在于优化系统性能&#xff0c;使系统的衰减比尽可能接近理…

Ubuntu20.04 Docker中换apt源实录 (安装vim过慢解决方案)

1. 查看/etc/os-release $ cat /etc/os-releasePRETTY_NAME"Debian GNU/Linux 12 (bookworm)" NAME"Debian GNU/Linux" VERSION_ID"12" VERSION"12 (bookworm)" VERSION_CODENAMEbookworm IDdebian HOME_URL"https://www.debian…

Node.js入门与生态全解析:包管理与构建工具详解

Node.js入门与生态全解析&#xff1a;包管理与构建工具详解 目录 &#x1f3af; 包管理 使用 npm 和 yarn&#xff1a;项目依赖管理的利器创建和发布 npm 包&#xff1a;实现模块化与共享 ⚙️ 构建工具 使用 Webpack 和 Babel&#xff1a;高效打包与代码转换配置构建流程&am…

投资一家无人机培训机构技术详解

无人机培训机构是随着无人机技术的快速发展和普及而兴起的一种专业培训机构。这类机构专注于为学员提供无人机相关的理论知识、操控技能以及应用技术培训&#xff0c;以满足不同领域对无人机人才的需求。 1. 市场调研与定位 市场调研 在投资无人机培训机构之前&#xff0c;深…

NPM私库搭建-verdaccio(Linux)

1、安装node linux服务器安装node a)、官网下载所需的node版本 https://nodejs.org/dist/v14.21.0/b)、解压安装包 若下载的是xxx.tar.xz文件&#xff0c;解压命令为tar -xvf xxx.tar.xzc)、修改环境变量 修改&#xff1a;/etc/profile文件 #SET PATH FOR NODEJS export NOD…

Dubbo缓存

是的&#xff0c;Dubbo 可以对服务调用结果进行缓存。通过缓存结果&#xff0c;可以减少重复调用、降低服务提供者的负载&#xff0c;并提高系统的响应速度和吞吐量。Dubbo 内置了多种缓存机制&#xff0c;开发者可以根据不同的业务需求选择合适的缓存策略。 1. Dubbo 结果缓存…

Java面试篇基础部分-垃圾回收算法

大厂面试垃圾回收算法详解内容。 JVM内存垃圾对象的确定? Java开发过程中,使用引用计数和可达性分析。来确定一个对象是否是需要被回收的对象,如果是需要被回收的对象则,对其进行垃圾回收,如果不是则说明是存在引用的对象,则不能清理 首先来分析一下引用算法,如果这个时…

RPC框架-protobuf-rpc-pro

protobuf-rpc-pro 是一个基于 Protocol Buffers 的 RPC 框架&#xff0c;旨在通过使用 Google 的 Protocol Buffers&#xff08;Protobuf&#xff09;序列化格式实现高效、轻量的远程过程调用&#xff08;RPC&#xff09;。它主要用于 Java 生态系统&#xff0c;提供了简洁的 A…