完整地实现了推荐系统的构建、实验和评估过程,为不同推荐算法在同一数据集上的性能比较提供了可重复实验的框架

ops/2025/1/16 19:49:57/
python">{"cells": [{"cell_type": "markdown","metadata": {},"source": ["# 基于用户的协同过滤算法"]},{"cell_type": "code","execution_count": 1,"metadata": {},"outputs": [],"source": ["# 导入包\n","import random\n","import math\n","import time\n","from tqdm import tqdm"]},{"cell_type": "markdown","metadata": {},"source": ["## 一. 通用函数定义"]},{"cell_type": "code","execution_count": 2,"metadata": {},"outputs": [],"source": ["# 定义装饰器,监控运行时间\n","def timmer(func):\n","    def wrapper(*args, **kwargs):\n","        start_time = time.time()\n","        res = func(*args, **kwargs)\n","        stop_time = time.time()\n","        print('Func %s, run time: %s' % (func.__name__, stop_time - start_time))\n","        return res\n","    return wrapper"]},{"cell_type": "markdown","metadata": {},"source": ["### 1. 数据处理相关\n","1. load data\n","2. split data"]},{"cell_type": "code","execution_count": 3,"metadata": {},"outputs": [],"source": ["class Dataset():\n","    \n","    def __init__(self, fp):\n","        # fp: data file path\n","        self.data = self.loadData(fp)\n","    \n","    @timmer\n","    def loadData(self, fp):\n","        data = []\n","        for l in open(fp):\n","            data.append(tuple(map(int, l.strip().split('::')[:2])))\n","        return data\n","    \n","    @timmer\n","    def splitData(self, M, k, seed=1):\n","        '''\n","        :params: data, 加载的所有(user, item)数据条目\n","        :params: M, 划分的数目,最后需要取M折的平均\n","        :params: k, 本次是第几次划分,k~[0, M)\n","        :params: seed, random的种子数,对于不同的k应设置成一样的\n","        :return: train, test\n","        '''\n","        train, test = [], []\n","        random.seed(seed)\n","        for user, item in self.data:\n","            # 这里与书中的不一致,本人认为取M-1较为合理,因randint是左右都覆盖的\n","            if random.randint(0, M-1) == k:  \n","                test.append((user, item))\n","            else:\n","                train.append((user, item))\n","\n","        # 处理成字典的形式,user->set(items)\n","        def convert_dict(data):\n","            data_dict = {}\n","            for user, item in data:\n","                if user not in data_dict:\n","                    data_dict[user] = set()\n","                data_dict[user].add(item)\n","            data_dict = {k: list(data_dict[k]) for k in data_dict}\n","            return data_dict\n","\n","        return convert_dict(train), convert_dict(test)"]},{"cell_type": "markdown","metadata": {},"source": ["### 2. 评价指标\n","1. Precision\n","2. Recall\n","3. Coverage\n","4. Popularity(Novelty)"]},{"cell_type": "code","execution_count": 4,"metadata": {},"outputs": [],"source": ["class Metric():\n","    \n","    def __init__(self, train, test, GetRecommendation):\n","        '''\n","        :params: train, 训练数据\n","        :params: test, 测试数据\n","        :params: GetRecommendation, 为某个用户获取推荐物品的接口函数\n","        '''\n","        self.train = train\n","        self.test = test\n","        self.GetRecommendation = GetRecommendation\n","        self.recs = self.getRec()\n","        \n","    # 为test中的每个用户进行推荐\n","    def getRec(self):\n","        recs = {}\n","        for user in self.test:\n","            rank = self.GetRecommendation(user)\n","            recs[user] = rank\n","        return recs\n","        \n","    # 定义精确率指标计算方式\n","    def precision(self):\n","        all, hit = 0, 0\n","        for user in self.test:\n","            test_items = set(self.test[user])\n","            rank = self.recs[user]\n","            for item, score in rank:\n","                if item in test_items:\n","                    hit += 1\n","            all += len(rank)\n","        return round(hit / all * 100, 2)\n","    \n","    # 定义召回率指标计算方式\n","    def recall(self):\n","        all, hit = 0, 0\n","        for user in self.test:\n","            test_items = set(self.test[user])\n","            rank = self.recs[user]\n","            for item, score in rank:\n","                if item in test_items:\n","                    hit += 1\n","            all += len(test_items)\n","        return round(hit / all * 100, 2)\n","    \n","    # 定义覆盖率指标计算方式\n","    def coverage(self):\n","        all_item, recom_item = set(), set()\n","        for user in self.test:\n","            for item in self.train[user]:\n","                all_item.add(item)\n","            rank = self.recs[user]\n","            for item, score in rank:\n","                recom_item.add(item)\n","        return round(len(recom_item) / len(all_item) * 100, 2)\n","    \n","    # 定义新颖度指标计算方式\n","    def popularity(self):\n","        # 计算物品的流行度\n","        item_pop = {}\n","        for user in self.train:\n","            for item in self.train[user]:\n","                if item not in item_pop:\n","                    item_pop[item] = 0\n","                item_pop[item] += 1\n","\n","        num, pop = 0, 0\n","        for user in self.test:\n","            rank = self.recs[user]\n","            for item, score in rank:\n","                # 取对数,防止因长尾问题带来的被流行物品所主导\n","                pop += math.log(1 + item_pop[item])\n","                num += 1\n","        return round(pop / num, 6)\n","    \n","    def eval(self):\n","        metric = {'Precision': self.precision(),\n","                  'Recall': self.recall(),\n","                  'Coverage': self.coverage(),\n","                  'Popularity': self.popularity()}\n","        print('Metric:', metric)\n","        return metric"]},{"cell_type": "markdown","metadata": {},"source": ["## 二. 算法实现\n","1. Random\n","2. MostPopular\n","3. UserCF\n","4. UserIIF"]},{"cell_type": "code","execution_count": 5,"metadata": {},"outputs": [],"source": ["# 1. 随机推荐\n","def Random(train, K, N):\n","    '''\n","    :params: train, 训练数据集\n","    :params: K, 可忽略\n","    :params: N, 超参数,设置取TopN推荐物品数目\n","    :return: GetRecommendation,推荐接口函数\n","    '''\n","    items = {}\n","    for user in train:\n","        for item in train[user]:\n","            items[item] = 1\n","    \n","    def GetRecommendation(user):\n","        # 随机推荐N个未见过的\n","        user_items = set(train[user])\n","        rec_items = {k: items[k] for k in items if k not in user_items}\n","        rec_items = list(rec_items.items())\n","        random.shuffle(rec_items)\n","        return rec_items[:N]\n","    \n","    return GetRecommendation"]},{"cell_type": "code","execution_count": 6,"metadata": {},"outputs": [],"source": ["# 2. 热门推荐\n","def MostPopular(train, K, N):\n","    '''\n","    :params: train, 训练数据集\n","    :params: K, 可忽略\n","    :params: N, 超参数,设置取TopN推荐物品数目\n","    :return: GetRecommendation, 推荐接口函数\n","    '''\n","    items = {}\n","    for user in train:\n","        for item in train[user]:\n","            if item not in items:\n","                items[item] = 0\n","            items[item] += 1\n","        \n","    def GetRecommendation(user):\n","        # 随机推荐N个没见过的最热门的\n","        user_items = set(train[user])\n","        rec_items = {k: items[k] for k in items if k not in user_items}\n","        rec_items = list(sorted(rec_items.items(), key=lambda x: x[1], reverse=True))\n","        return rec_items[:N]\n","    \n","    return GetRecommendation"]},{"cell_type": "code","execution_count": 7,"metadata": {},"outputs": [],"source": ["# 3. 基于用户余弦相似度的推荐\n","def UserCF(train, K, N):\n","    '''\n","    :params: train, 训练数据集\n","    :params: K, 超参数,设置取TopK相似用户数目\n","    :params: N, 超参数,设置取TopN推荐物品数目\n","    :return: GetRecommendation, 推荐接口函数\n","    '''\n","    # 计算item->user的倒排索引\n","    item_users = {}\n","    for user in train:\n","        for item in train[user]:\n","            if item not in item_users:\n","                item_users[item] = []\n","            item_users[item].append(user)\n","    \n","    # 计算用户相似度矩阵\n","    sim = {}\n","    num = {}\n","    for item in item_users:\n","        users = item_users[item]\n","        for i in range(len(users)):\n","            u = users[i]\n","            if u not in num:\n","                num[u] = 0\n","            num[u] += 1\n","            if u not in sim:\n","                sim[u] = {}\n","            for j in range(len(users)):\n","                if j == i: continue\n","                v = users[j]\n","                if v not in sim[u]:\n","                    sim[u][v] = 0\n","                sim[u][v] += 1\n","    for u in sim:\n","        for v in sim[u]:\n","            sim[u][v] /= math.sqrt(num[u] * num[v])\n","    \n","    # 按照相似度排序\n","    sorted_user_sim = {k: list(sorted(v.items(), \\\n","                               key=lambda x: x[1], reverse=True)) \\\n","                       for k, v in sim.items()}\n","    \n","    # 获取接口函数\n","    def GetRecommendation(user):\n","        items = {}\n","        seen_items = set(train[user])\n","        for u, _ in sorted_user_sim[user][:K]:\n","            for item in train[u]:\n","                # 要去掉用户见过的\n","                if item not in seen_items:\n","                    if item not in items:\n","                        items[item] = 0\n","                    items[item] += sim[user][u]\n","        recs = list(sorted(items.items(), key=lambda x: x[1], reverse=True))[:N]\n","        return recs\n","    \n","    return GetRecommendation"]},{"cell_type": "code","execution_count": 8,"metadata": {},"outputs": [],"source": ["# 4. 基于改进的用户余弦相似度的推荐\n","def UserIIF(train, K, N):\n","    '''\n","    :params: train, 训练数据集\n","    :params: K, 超参数,设置取TopK相似用户数目\n","    :params: N, 超参数,设置取TopN推荐物品数目\n","    :return: GetRecommendation, 推荐接口函数\n","    '''\n","    # 计算item->user的倒排索引\n","    item_users = {}\n","    for user in train:\n","        for item in train[user]:\n","            if item not in item_users:\n","                item_users[item] = []\n","            item_users[item].append(user)\n","    \n","    # 计算用户相似度矩阵\n","    sim = {}\n","    num = {}\n","    for item in item_users:\n","        users = item_users[item]\n","        for i in range(len(users)):\n","            u = users[i]\n","            if u not in num:\n","                num[u] = 0\n","            num[u] += 1\n","            if u not in sim:\n","                sim[u] = {}\n","            for j in range(len(users)):\n","                if j == i: continue\n","                v = users[j]\n","                if v not in sim[u]:\n","                    sim[u][v] = 0\n","                # 相比UserCF,主要是改进了这里\n","                sim[u][v] += 1 / math.log(1 + len(users))\n","    for u in sim:\n","        for v in sim[u]:\n","            sim[u][v] /&#

http://www.ppmy.cn/ops/150627.html

相关文章

leetcode 面试经典 150 题:快乐数

链接快乐数题序号202题型数组解题方法哈希表难度简单熟练度✅✅✅✅ 题目 编写一个算法来判断一个数 n 是不是快乐数。 [快乐数] 定义为: 对于一个正整数,每一次将该数替换为它每个位置上的数字的平方和。 然后重复这个过程直到这个数变为 1&#xff0…

Unity 视频导入unity后,播放时颜色变得很暗很深,是什么原因导致?

视频正常播放时的颜色: 但是,当我在unity下,点击视频播放按钮时,视频的颜色立马变得十分昏暗: 解决办法: 将File—BuildSettings—PlayerSettings—OtherSettings下的Color Space改为:Gamma即可…

【Unity-Animator】通过 StateMachineBehaviour 实现回调

StateMachineBehaviour 简介 StateMachineBehaviour是一个基类,所有状态脚本都派生自该类。它可以在状态机进入、退出或更新状态时执行代码,而无需编写自己的逻辑来测试和检测状态的变化。这使得开发者可以更方便地处理状态转换时的逻辑,例…

设计出有利于后期扩展成高可用高并发的java单例应用的一些建议

将一个单例的 Java 应用程序设计成易于后期扩展为高并发高可用系统是一个重要的架构决策。以下是一些关键的设计原则和步骤,帮助你实现这一目标。 关键设计原则 模块化设计: 将应用分解为多个独立的服务或模块,便于单独扩展和维护。无状态设…

【LeetCode: 240. 搜索二维矩阵 II + 指针 + 遍历】

🚀 算法题 🚀 🌲 算法刷题专栏 | 面试必备算法 | 面试高频算法 🍀 🌲 越难的东西,越要努力坚持,因为它具有很高的价值,算法就是这样✨ 🌲 作者简介:硕风和炜,…

CES 2025:XEO 展出双眼8K VR头显,售价2000美元

在2025年国际消费类电子产品展览会(CES)上,XEO公司凭借其最新的VR头显设备吸引了众多目光。这款标榜双眼8K分辨率的高端VR头显不仅展示了当前虚拟现实技术的顶尖水平,同时也设定了新的行业标杆。本文将详细介绍这款产品的特点、技术和市场定位。 XEO 双眼8K VR头显:极致视…

《基于深度学习的多色光度巡天项目天文目标检测框架》论文精读

A deep learning based astronomical target detection framework for multi-colour photometry sky survey projects 摘要 多色测光巡天项目将利用广角望远镜和几种不同的滤光片获得不同颜色的天体图像。不同颜色的图像可以揭示天体的不同组成部分。我们将能够利用这些图像研…

探秘 JMeter (Interleave Controller)交错控制器:解锁性能测试的隐藏密码

嘿,小伙伴们!今天咱们要把 JMeter 里超厉害的 Interleave Controller(交错控制器)研究个透,让你从新手直接进阶成高手,轻松拿捏各种性能测试难题! 一、Interleave Controller 深度剖析 所属家族…