NLP(13)--文本分类任务

devtools/2024/9/23 14:29:09/

前言

仅记录学习过程,有问题欢迎讨论

情感分析,违规检测,商品评论打分
贝叶斯算法:

P(B1) = 结果为奇数
P(B2) = 结果为偶数
P(A) = 结果为5
P(A) = P(B1) * P(A|B1) + P(B2) * P(A|B2) = 1/2 1/3 + 1/20

支持向量机:less use now
svm尝试找一个最优的决策边界,来解决一个 二分类的问题解决不了线性不可分问题,只能以更高维来区分数据(升维 类似于bert中的 feed forward)使用 核函数解决高纬度向量内积问题

深度学习

TextRNN(LSTM):
 是RNN的变体,相比于tansformer,没那么复杂,东西没那么多
一定程度规避传统RNN会导致信息遗忘和梯度消失的问题
(把前向和当前的信息做一定筛选【门】后保存)
CNN(一维):
是包含一定的语序信息的
通过一维卷积对文本进行编码,
编码后的文本通过pooling转化为向量,用于分类
Bert
取 【CLS】token对应的向量整句话的向量求MAX/AVG pooling需要再接一层来实现想要的需求,需要微调
(但是准确率还是比RNN高)
数据稀疏问题(新数据测试不理想):
添加标注数据!!
构造训练样本
换模型
调整阈值,用召回率换准确率(两者概率相反)
减少样本类别
标签不均衡问题:
过采样:复制指定类别样本,可以重复
降采样:随机下采样,可以减少类别样本数量
多标签分类问题:
1.分解为多个独立的二分类
(分为多个模型来判断数据是否属于该类)
2.转化为多分类问题(同时属于13/12/23类别)

代码

bert实现多分类任务demo(优化得跑更多的数据,是真的慢。。)

"""
使用bert 实现一个多分类任务
判断 输入的句子属于哪个板块的数据
"""
import json
import osimport numpy as np
import torch
import torch.nn as nnfrom transformers import BertModel, BertTokenizer
from torch.utils.data import DataLoader, TensorDatasetclass PickModel(nn.Module):def __init__(self, input_dim, output_size, max_len=128):super(PickModel, self).__init__()self.bert = BertModel.from_pretrained(r"E:\Anlp\week6语言模型和预训练\bert-base-chinese", return_dict=False)self.linear = nn.Linear(input_dim, output_size)# 归一化算概率self.activation = torch.sigmoidself.dropout = nn.Dropout(0.4)self.pool = nn.MaxPool1d(max_len)self.loss = nn.functional.cross_entropydef forward(self, x, y=None):sequence_output, pool_output = self.bert(x)x = self.linear(sequence_output)x = self.pool(x.transpose(1, 2)).squeeze()  # input shape:(batch_size, sen_len, input_dim)y_pred = self.activation(x)if y is not None:return self.loss(y_pred, y)else:return y_pred# 构建 tag对应的向量set
# tag_dict = {}def build_dataset(corpus_path, simples_size):print("============")x = []y = []# 用来记录每次的tag 方便后续构建tag对应的向量# list = []# 加载中文分词 直接转化为向量 并且添加前后标记tokenizer = BertTokenizer.from_pretrained("E:\\Anlp\\week6语言模型和预训练\\bert-base-chinese")tag_dict = open("tag_dict.json", "r", encoding="utf8").read()# 读取tag_dict.json文件 变为字典tag_dict = json.loads(tag_dict)with open(corpus_path, encoding="utf8") as f:i = 0# 读取文件中随机某一行的数据lines = f.readlines()# 随机打乱数据np.random.shuffle(lines)for line in lines:if i < simples_size:print(line[:50])i += 1# loads操作字符串 load操作文件流line = json.loads(line)# 返回的张量类型(pytorch),是否填充,是否截取,都为128的长度content_input_ids = tokenizer.encode(str(line["content"]), max_length=128, pad_to_max_length=True,truncation=True)x.append(content_input_ids)# 记录有多少个tag 放入key# tag_dict[str(line["tag"])] = 1# list.append(str(line["tag"]))y.append(tag_dict.get(str(line["tag"])))# 对每个tag定义index 方便loss计算# for index, tag in enumerate(tag_dict.keys()):#     tag_dict[tag] = index# 把tag_dict的key,value保存到文件中# if not os.path.exists("tag_dict.json"):#     with open("tag_dict.json", "w", encoding="utf8") as f:#         json.dump(tag_dict, f, ensure_ascii=False)return torch.LongTensor(x), torch.LongTensor(y)# test
# build_dataset("E:\\Anlp\\week7文本分类问题\\data\\train_tag_news.json",100)def main():char_dim = 768epoch_num = 10simples_size = 200batch_num = 20# build_dataset("E:\\Anlp\\week7文本分类问题\\data\\train_tag_news.json", simples_size)x, y = build_dataset("E:\\Anlp\\week7文本分类问题\\data\\train_tag_news.json", simples_size)dataset = TensorDataset(x, y)dataiter = DataLoader(dataset, batch_num, shuffle=True)model = PickModel(char_dim, 18)  # 建立模型optim = torch.optim.Adam(model.parameters(), lr=1e-3)  # 建立优化器)for epoch in range(epoch_num):epoch_loss = []model.train()for x, y in dataiter:loss = model(x, y)loss.backward()optim.step()optim.zero_grad()epoch_loss.append(loss.item())# print("=========\n第%d轮平均loss:%f" % (epoch + 1, np.mean(epoch_loss)))print("==\n第%d轮,epoch—loss mean 为 %f" % (epoch + 1, np.mean(epoch_loss)))torch.save(model.state_dict(), "model0506.pth")returndef predict(model_path):# 建立模型model = PickModel(768, 18)model.load_state_dict(torch.load(model_path))model.eval()# sentence = input()# 加载中文分词 直接转化为向量 并且添加前后标记x, y_true = build_dataset("E:\\Anlp\\week7文本分类问题\\data\\valid_tag_news.json", 100)# tag_dict1 = open("tag_dict.json", "r", encoding="utf8").read()# 读取tag_dict.json文件 变为字典# tag_dict1 = json.loads(tag_dict1)correct, wrong = 0, 0with torch.no_grad():result = model(x)# 返回最大概率的indexy_pred = [torch.argmax(i) for i in list(result)]print(y_true)print(y_pred)# 返回最大概率的tagfor y_p, y in zip(y_pred, y_true):  # 与真实标签进行对比if int(y_p) == int(y):correct += 1  # 正样本判断正确else:wrong += 1print("正确预测个数:%d / %d, 正确率:%f" % (correct, correct + wrong, correct / (correct + wrong)))returnif __name__ == '__main__':# main()predict("model0506.pth")

tag_dict.json

{"文化": 0, "时尚": 1, "健康": 2, "教育": 3, "军事": 4, "股票": 5, "娱乐": 6, "游戏": 7, "科技": 8, "彩票": 9, "旅游": 10, "汽车": 11, "体育": 12, "家居": 13, "财经": 14, "国际": 15, "房产": 16, "社会": 17}

代码二

标准化流程实现分类任务
训练样本 1w+ 验证样本 1261 训练轮次15

RNN:正确率 80.8 1020/1261 4min40s

LSTM: 正确率 79.3 1001/260 4min43s

BERT: 正确率 82.3 跑第3轮结束耗时 8min40s

config.py 配置文件

"""
配置参数信息
"""
Config = {"model_path": "./output/","model_name": "model.pt","train_data_path": r"D:\NLP\video\第七周\data\train_simple.csv","valid_data_path": r"D:\NLP\\video\第七周\data\valid_simple.csv","vocab_path": r"D:\NLP\video\第七周\data\vocab.txt","model_type": "bert","use_bert": True,# 文本向量大小"char_dim": 128,# 文本长度"max_len": 50,# 词向量大小"hidden_size": 256,# 训练 轮数"epoch_size": 15,# 批量大小"batch_size": 25,# 训练集大小"simple_size": 300,# 学习率"lr": 0.001,# dropout"dropout": 0.5,# 优化器"optimizer": "adam",# 卷积核"kernel_size": 3,# 最大池 or 平均池"pooling_style": "max",# 模型层数"num_layers": 2,"bert_model_path": r"D:\NLP\video\第六周\bert-base-chinese",# 输出层大小"output_size": 2,# 随机数种子"seed": 987
}

load.py j加载数据文件

"""
数据加载
"""
import os
import numpy as np
import pandas as pd
import json
import re
import os
import torch
import torch.utils.data as Data
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer# 获取字表集
def load_vocab(path):vocab = {}with open(path, 'r', encoding='utf-8') as f:for index, line in enumerate(f):word = line.strip()# 0留给padding位置,所以从1开始vocab[word] = index + 1vocab['unk'] = len(vocab) + 1return vocab# 数据预处理 裁剪or填充
def padding(input_ids, length):if len(input_ids) >= length:return input_ids[:length]else:padded_input_ids = input_ids + [0] * (length - len(input_ids))return padded_input_ids# 文本预处理
# 转化为向量
def sentence_to_index(text, length, vocab):input_ids = []for char in text:input_ids.append(vocab.get(char, vocab['unk']))# 填充or裁剪input_ids = padding(input_ids, length)return input_idsclass DataGenerator:def __init__(self, data_path, config):self.data_path = data_pathself.config = configif self.config["model_type"] == "bert":self.tokenizer = BertTokenizer.from_pretrained(config["bert_model_path"])self.vocab = load_vocab(config["vocab_path"])self.config["vocab_size"] = len(self.vocab)self.data = self.load_data()def __len__(self):return len(self.data)def __getitem__(self, idx):return self.data[idx]def load_data(self):dataset_x = []dataset_y = []with open(self.data_path, 'r', encoding='utf-8') as f:for line in f:row = line.strip().split(',')# 第一列为标签,第二列为文本label = int(row[0])text = row[1]# 文本预处理if self.config["model_type"] == "bert":input_ids = self.tokenizer.encode(text, max_length=self.config["max_len"], pad_to_max_length=True)else:# 转化为对应的字表idinput_ids = sentence_to_index(text, self.config["max_len"], self.vocab)# 标签和文本组成一个样本dataset_x.append(input_ids)dataset_y.append(label)data = Data.TensorDataset(torch.tensor(dataset_x), torch.tensor(dataset_y))return data# 用torch自带的DataLoader类封装数据
def load_data_batch(data_path, config, shuffle=True):dg = DataGenerator(data_path, config)dl = DataLoader(dg.data, batch_size=config["batch_size"], shuffle=shuffle)return dlif __name__ == '__main__':from config import Configdg = DataGenerator(Config["train_data_path"], Config)print(len(dg))print(dg[0])

model.py 模型文件

import torch
import os
import random
import os
import numpy as np
import logging
from config import Config
from model import TorchModel, choose_optimizer
from loader import load_data_batch
from evaluate import Evaluator# [DEBUG, INFO, WARNING, ERROR, CRITICAL]logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)"""
模型训练主程序
"""
# 通过设置随机种子来复现上一次的结果(避免随机性)
seed = Config["seed"]
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)def main(config):# 保存模型的目录if not os.path.isdir(config["model_path"]):os.mkdir(config["model_path"])# 加载数据dataset = load_data_batch(config["train_data_path"], config)# 加载模型model = TorchModel(config)# 是否使用gpuif torch.cuda.is_available():logger.info("gpu可以使用,迁移模型至gpu")model.cuda()# 选择优化器optim = choose_optimizer(config, model)# 加载效果测试类evaluator = Evaluator(config, model, logger)for epoch in range(config["epoch_size"]):epoch += 1logger.info("epoch %d begin" % epoch)epoch_loss = []# 训练模型model.train()for batch_data in dataset:if torch.cuda.is_available():batch_data = [d.cuda() for d in batch_data]# x, y = dataiter# 反向传播optim.zero_grad()x, y = batch_data     # 输入变化时这里需要修改,比如多输入,多输出的情况# 计算梯度loss = model(x, y)# 梯度更新loss.backward()# 优化器更新模型optim.step()# 记录损失epoch_loss.append(loss.item())logger.info("epoch average loss: %f" % np.mean(epoch_loss))# 测试模型效果acc = evaluator.eval(epoch)# 可以用model_type model_path epoch 三个参数来保存模型model_path = os.path.join(config["model_path"], "epoch_%d_%s.pth" % (epoch, config["model_type"]))torch.save(model.state_dict(), model_path)  # 保存模型权重returnif __name__ == "__main__":main(Config)# for model in ["cnn"]:#     Config["model_type"] = model#     print("最后一轮准确率:", main(Config), "当前配置:", Config["model_type"])# 对比所有模型# 中间日志可以关掉,避免输出过多信息# 超参数的网格搜索# for model in ["gated_cnn"]:#     Config["model_type"] = model#     for lr in [1e-3, 1e-4]:#         Config["learning_rate"] = lr#         for hidden_size in [128]:#             Config["hidden_size"] = hidden_size#             for batch_size in [64, 128]:#                 Config["batch_size"] = batch_size#                 for pooling_style in ["avg"]:#                     Config["pooling_style"] = pooling_style# 可以把输出放入文件中 便于查看#                     print("最后一轮准确率:", main(Config), "当前配置:", Config)

evaluate.py 评估模型文件

"""
模型效果测试
"""
import torch
from loader import load_data_batchclass Evaluator:def __init__(self, config, model, logger):self.config = configself.model = modelself.logger = logger# 选择验证集合self.dataset = load_data_batch(config["valid_data_path"], config, shuffle=False)self.stats_dict = {"correct": 0, "wrong": 0}  # 用于存储测试结果def eval(self, epoch):self.logger.info("开始测试第%d轮模型效果:" % epoch)# 测试模式self.model.eval()self.stats_dict = {"correct": 0, "wrong": 0}  # 清空上一轮结果for index,batch_data in enumerate(self.dataset):if torch.cuda.is_available():batch_data = [d.cuda() for d in batch_data]x,y = batch_data  #输入变化时这里需要修改,比如多输入,多输出的情况with torch.no_grad():pred = self.model(x)# dim=1 表示在第一维上进行比较,即取一行的argmaxpred = [torch.argmax(i) for i in pred]for pred, y in zip(pred, y):# 预测正确if pred == y:self.stats_dict["correct"] += 1# 预测错误else:self.stats_dict["wrong"] += 1acc = self.show_stats()return accdef show_stats(self):correct = self.stats_dict["correct"]wrong = self.stats_dict["wrong"]self.logger.info("预测集合条目总量:%d" % (correct + wrong))self.logger.info("预测正确条目:%d,预测错误条目:%d" % (correct, wrong))self.logger.info("预测准确率:%f" % (correct / (correct + wrong)))self.logger.info("--------------------")return correct / (correct + wrong)

http://www.ppmy.cn/devtools/39662.html

相关文章

面试前端随笔20240510

最近公司招聘前端开发人员有幸参与帮听&#xff0c;总结了三个有关vue的面试问题和答案&#xff0c;现在分享一下。 1.Vue2数据监听无法监听数组为啥&#xff1f;有啥解决方案&#xff1f;vue3中是如何处理这个问题&#xff1f; vue2的官方说明了defineProperty的一些限制&…

程序设计实践-课程设计任务布置(麦当劳) (price 200)

code price 200 WX: help-assignment 课题任务-概述 2023年5月&#xff0c;麦当劳在北邮开业。大量的学生去那里订餐。正因为如此&#xff0c;麦当劳的在线点餐系统经常关闭以避免拥挤&#xff0c;尤其是在午餐和晚餐时间。该系统的关闭时间不确定。北邮的学生认为这非常麻烦…

【Leetcode】 top100 round2 需要加强版

知识补充 python赋值的执行顺序&#xff1a; 在41中&#xff0c;对于测试案例[-1,4,3,1] 当i1时&#xff0c;以下两条语句的执行结果不一致&#xff1a; “nums[nums[i]-1], nums[i] nums[i], nums[nums[i]-1]” “nums[i], nums[nums[i]-1] nums[nums[i]-1], nums[i]” 解析…

ok_Keil实用小技巧 | Keil定制Hex文件名实现的方法

Keil实用小技巧 | Keil定制Hex文件名实现的方法 echo off REM 可执行文件&#xff08;Hex&#xff09;文件名 set HEX_NAMEDemo REM 可执行文件&#xff08;Hex&#xff09;文件路径 set HEX_PATH.\Objects REM 定制Hex输出路径 set OUTPUT_PATH.\Output REM 软件版本文件…

(文章复现)基于变异粒子群算法的主动配电网故障恢复策略

参考文献&#xff1a; [1]徐岩,张荟,孙易洲.基于变异粒子群算法的主动配电网故障恢复策略[J].电力自动化设备,2021,41(12):45-53.DOI:10.16081/j.epae.202108030. 1.基本原理 为提高主动配电网故障恢复的快速性和可靠性&#xff0c;提出一种基于变异粒子群算法的恢复策略。光…

Tarjan----寻找最近公共祖先(LCA) 板子

一、Tarjan算法作用&#xff1a; Tarjan算法是一种用于寻找图中节点的最近公共祖先&#xff08;LCA&#xff09;的算法。该算法通过深度优先搜索&#xff08;DFS&#xff09;遍历图&#xff0c;并使用并查集&#xff08;Union-Find&#xff09;数据结构来快速找到两个节点的最近…

[YOLOv8] 用YOLOv8实现指针式圆形仪表智能读数(三)

最近研究了一个项目&#xff0c;利用python代码实现指针式圆形仪表的自动读数&#xff0c;并将读数结果进行输出&#xff0c;若需要完整数据集和源代码可以私信。 目录 &#x1f353;&#x1f353;1.yolov8实现圆盘形仪表智能读数 &#x1f64b;&#x1f64b;2.表盘智能读数…

WPS二次开发系列:如何使用WPS返回的FileUri

作者持续关注 WPS二次开发专题系列&#xff0c;持续为大家带来更多有价值的WPS开发技术细节&#xff0c;如果能够帮助到您&#xff0c;请帮忙来个一键三连&#xff0c;更多问题请联系我&#xff08;QQ:250325397&#xff09; 目录 什么是FileUri 在SDK中的使用场景 打开文档时…