实验16 基于双向LSTM和注意力机制完成文本分类实验

ops/2024/12/22 10:03:58/

一 实验原理

双向LSTM:

对于文本的情感分类,取决于过于和未来的上下文,需要用到双向的LSTM,通过反向更新的隐藏层来获得方向时间信息。

双向LSTM是在传统LSTM的基础上进行扩展,它通过两个LSTM网络分别对输入序列进行正向和反向处理,从而捕捉序列中的上下文信息。

正向LSTM:从序列的第一个词开始处理,依次传递信息到最后一个词。

反向LSTM:从序列的最后一个词开始处理,逆序传递信息到第一个词。

这两个LSTM网络的输出会被合并(通常是拼接或加权求和),以便在每个时间步捕捉到正向和反向的上下文信息。

双向LSTM进行文本情感分类模型

因为我们要处理的是文本内容情感分类,但是文本不能作为模型的输入,所以需要创建一个词表将单词转换为数字,词表里里面含了单词和对应的索引,类似于图2。根据词表把一段句子转换为数字,比如说“this movie is nice”可以转换为数字“1,2,3,4”。因为这个实验直接调用函数LSTM,LSTM的输入必须是batch_size*seq_len*input_size,将输入传入模型之后直接得到每个时间步的隐层输出,其中seq_len决定了LSTM需要几个时间步,所以对于同一个批次的数据来说,输入的长度必须是一样的,比如“this movie is nice”和“this movie is very very nice”变为数字索引后是[1,2,3,4]和[1,2,3,5,5,4],如果是同一个批次的输入的话,那么就必须把前一个句子的长度拉长,变为“[1,2,3,4,0,0]”。

处理好数据之后,利用torch的嵌入函数进行嵌入,输入是batch_size*seq_len,得到的结果大小是batch_size*seq_len*embedding_dim(我理解就是对于每个单词来说,从维度1变成了维度embedding_dim,从一个数字变成了向量)。

然后直接调用双向LSTM函数,其实是得到了正向的隐层输出和反向的隐层输出,只看一个方向的输出的话,所有隐层的输出大小是batch_size*seq_len*hidden_size,看两个方向的话,隐层的输出大小是batch_size*seq_len*hidden_size*2。

汇聚层是将大小从batch_size*seq_len*hidden_size*2变为batch_size*hidden_size*2,这一块我一开始不理解,研究发现就是batch_size就是句子的个数,seq_len是句子中单词的数量,hidden_size是每个单词的向量表示维度。汇聚层就是沿着seq_len的方向,将每个维度进行加和平均,使seq_len变为1,将LSTM网络得到的序列输出整合成一个固定维度的向量,以便进行后续的分类,因为神经网络的输出层通常需要固定维度的输入。

最后一个全连接层就是分类问题模型的经典结尾,经过全连接函数得到的结果是batch_size*num_classes,每一行是样本在积极消极类别上的logits。

 注意力:

在传统的聚合方法中,如最大池化或平均池化,所有的时间步的输出通常是等权重的,即每个时间步对最终输出的贡献是一样的。但在实际应用中,某些时间步的输出比其他时间步更为重要,特别是在情感分析任务中。例如“this movie is nice”中nice对于情感分类来说比其他单词重要。这时,注意力机制能够发挥作用,它通过学习为每个时间步分配不同的权重,从而能够聚焦于那些与情感分类最相关的部分。

双向LSTM的输出是一个时间步序列,而注意力机制则通过为每个时间步分配一个权重,动态地加权这些输出,从而得到一个加权的上下文向量作为文本的最终表示。

注意力层的计算过程是:首先生成一个查询向量q。

①根据q和每个时间步通过计算得到注意力分数,(在本次使用中分别用到了加性模型和点积模型计算注意力分数)。

加性模型:

s\left( X,q \right) =v^T\tan\text{h}\left( XW+q^TU \right)

X是每个时间步的隐层输出,的大小是batch_size*seq_len*hidden_size*2,q是查询向量,大小是hidden_size2,其余为可学习参数。得到的注意力分数的大小为batch_size*seq_len,每一行都是一个句子中各个单词的注意力分数。

点积模型:

 s\left( X,q \right) =Xq

②然后经过softmax得到每个时间步注意力分布。

M是掩码(mask)矩阵[PAD]位置的元素值置为-1e9,其它位置的元素值置为0,[PAD]对应的Softmax输出变成了0,相当于把[PAD]这些没有特殊意义字符给屏蔽了,然后剩下元素计算注意力分布,这样做就减少了这些没有特殊意义单元对于注意力计算的影响。

③然后将注意力分布作为每个时间步的权重进行加权求和

z=\underset{n=1}{\overset{N}{\varSigma}}a_nx_n

最终z的大小是batch_size*hidden_size*2. 

基于双向LSTM模型完成文本分类任务

1.数据处理

数据处理的目的就是加载数据集,划分为训练集、验证集和测试集,然后利用词表将文本转换为对应的数字索引,对过长的文本进行裁剪,对长度不足的文本进行填充,最后转换为dataloader。

①首先加载训练集和测试集,然后利用切片操作将测试集划分为验证集和测试集。

②加载词汇表,数据集中提供了词汇表,但是里面最后单词没有索引,我在词汇表的最上面加入了两个新词汇[PAD][UNK],分别表示填充和词汇表中没出现的词。之后,编写load_vocab函数,里面先定义一个空的字典,然后以只读方式打开词汇文件,遍历文件的每一行(每一行是一个单词),利用strip函数删除每一行末尾的空白字符,利用字典将单词映射为索引word2id[word] = idx,idx从0开始。最终返回生成的字典,字典的键是单词,值是索引。

③创建IMDBDataset函数,用于将data转换为dataset,函数接收的参数是data和词表。函数里面除了包含必须存在的init函数、getitem函数和len函数,还有一个words_to_id函数。init函数中定义了词表并赋值为传入的词表,定义了examples赋值为转换为数字之后的data。words_to_id函数中首先定义一个空列表,遍历data(data是文本+标签),对于data中的每一个样本,将文本中的词转化为词典中的idx得到seq,然后将seq和label合成一个元组加入到空列表中,最后返回的就是一串数字索引+标签。getitem方法就是返回某个idx的内容,len函数就是返回data的样本个数。

创建完IMDBDataset函数之后就是实例化,得到train_set、dev_set和test_set。

④编写collate_fn,在创建dataloader时传入这个函数,以使提取出来的batch_size个样本的长度一致。这个函数接受的参数是batch_data(批量数据组成的列表)、填充值和序列最大长度。返回值是填充后的序列、序列真实长度和标签。在函数中,首先定义空列表seqs用于存储截断后的序列,空列表labels用于存储真实标签,空列表seq_len用于存储序列的长度。遍历batch_data的每个样本,获取seq和label,对seq进行切片操作,如果seq的长度超过最大长度就截短为最大长度。然后将序列、标签和序列长度都append到对应的列表。遍历结束后,利用max函数获得当前批次的最大长度max_len。下面就该对长度没到max_len的进行填充,填充的值是[PAD]对应的idx。最后把填充后的序列、序列的真实长度和标签转换为tensor后返回。

⑤将dataset转换为dataloader,在定义Dataloader时,把collate_fn传递进去,这样DataLoader 在加载数据时就会调用这个函数来对数据进行截断填充。

代码:

# 每个类别文件夹下都包含多个 txt 文件
def load_data_from_folder(folder):data = []for label, subfolder in enumerate(['neg', 'pos']):folder_path = os.path.join(folder, subfolder)for filename in os.listdir(folder_path):file_path = os.path.join(folder_path, filename)with open(file_path, 'r', encoding='utf-8') as file:text = file.read()data.append((text, str(label)))  # 合并文本和标签,格式化为 ("文本", '标签')return data# 读取训练集
train_data = load_data_from_folder(train_dir)
# 读取测试集
test_data = load_data_from_folder(test_dir)# 将测试集分割为验证集和测试集
dev_size = len(test_data) // 2
dev_data, test_data = test_data[:dev_size], test_data[dev_size:]# 查看结果
print(f"训练集大小: {len(train_data)}")
print(f"验证集大小: {len(dev_data)}")
print(f"测试集大小: {len(test_data)}")# 自定义Dataset类,用于将词汇转换为索引
class IMDBDataset(Dataset):def __init__(self, examples, word2id_dict):""":param examples: [(text, label), ...] 格式的数据:param word2id_dict: 词汇表,词汇 -> 索引ID"""self.word2id_dict = word2id_dictself.examples = self.words_to_id(examples)def words_to_id(self, examples):"""将文本中的词转换为词典中的ID,未知词用[UNK]的ID替代"""tmp_examples = []for seq, label in examples:seq = [self.word2id_dict.get(word, self.word2id_dict['[UNK]']) for word in seq.split()]label = int(label)tmp_examples.append((seq, label))return tmp_examplesdef __getitem__(self, idx):return self.examples[idx]def __len__(self):return len(self.examples)# 加载词汇表
def load_vocab(filepath):word2id = {}with open(filepath, 'r', encoding='utf-8') as f:for idx, line in enumerate(f):word = line.strip()word2id[word] = idxreturn word2idword2id_dict = load_vocab("aclImdb/imdb.vocab")# 实例化Dataset
train_set = IMDBDataset(train_data, word2id_dict)
dev_set = IMDBDataset(dev_data, word2id_dict)
test_set = IMDBDataset(test_data, word2id_dict)print('训练集样本数:', len(train_set))
print('样本示例:', train_set[4])
def collate_fn(batch_data, pad_val, max_seq_len=256):""":param batch_data: 批量数据 (seq, label) 组成的列表:param pad_val: 填充值,[PAD]对应的ID:param max_seq_len: 序列最大长度:return: 填充后的序列、序列真实长度和标签"""seqs, labels, seq_lens = [], [], []for seq, label in batch_data:# 截断序列seq = seq[:max_seq_len]seq_lens.append(len(seq))seqs.append(seq)labels.append(label)# 获取当前批次的最大长度max_len = max(seq_lens)# 填充序列padded_seqs = [seq + [pad_val] * (max_len - len(seq)) for seq in seqs]# 转换为Tensorpadded_seqs = torch.tensor(padded_seqs, dtype=torch.long)seq_lens = torch.tensor(seq_lens, dtype=torch.long)labels = torch.tensor(labels, dtype=torch.long)return (padded_seqs, seq_lens), labelsfrom functools import partial# 设置参数
max_seq_len = 256
batch_size = 128# collate_fn函数
collate_fn = partial(collate_fn, pad_val=word2id_dict["[PAD]"], max_seq_len=max_seq_len)# 构建DataLoader
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, drop_last=False, collate_fn=collate_fn)
dev_loader = DataLoader(dev_set, batch_size=batch_size, shuffle=False, drop_last=False, collate_fn=collate_fn)
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False, drop_last=False, collate_fn=collate_fn)

2.模型构建

首先在init函数中实例化模型需要用到的4个函数,分别是嵌入函数、LSTM函数、聚合函数和全连接函数。

①嵌入层,直接使用pytorch中预定义函数nn.Embedding,同时传入两个参数,分别是词典大小num_embeddings和嵌入维度input_size。

②双向LSTM模型,直接调用nn.LSTM,因为是双向,所以在定义时传入参数bidirectional=True,然后定义LSTM还要定义输入维度和隐层维度。

③聚合层,双向LSTM得到的输出大小是batch_size*seq_len*hidden_size*2,聚合层的目标就是让seq_len变为1,计算序列的平均值,同时忽略掉pad部分。因为在创建Dataloader的时候,让一个批次的样本的长度一致,所以有的样本用pad进行了填充,那我们在聚合也就是seq_len变为1的时候,要忽略这些pad。编写了函数AveragePooling实现了这个功能。

④全连接层,这个就是分类问题模型的结尾了,全连接层的输入维度是hidden_size*2,输出维度是num_classes,这个问题是一个二分类问题,所以num_classes就是2。

forward函数:对数据集进行处理,输出文本分类logits

input是序列+seq_len组成的元组,序列数据的大小是batch_size*seq_len,此时的一个样本中的序列是一段数字,每个数字代表一个单词。首先将其放入嵌入函数,获得词向量,此时数据的大小变为batch_size*seq_len*embedding_dim,就是把每个数字的维度从1扩展到embedding_dim维。

将获得的词向量放到LSTM模型,得到的输出大小为batch_size*seq_len*2*hidden_size,我认为LSTM类似于全连接函数,因为经过LSTM之后数据的维度从embedding_dim变成了2*hidden_size,只是LSTM相比全连接多了时间步和之前的记忆。

将LSTM获得的结果和序列真实长度放到聚合层,将seq_len变为1,计算序列的平均值。

最后将结果放到全连接函数,得到的结果大小为batch_size*2,每一行的两个数代表了样本属于积极消极的logits。

代码:

#模型构建
import torch
import torch.nn as nnclass AveragePooling(nn.Module):def __init__(self):super(AveragePooling, self).__init__()def forward(self, sequence_output, sequence_length):# 将 sequence_length 变为 float,并扩展维度sequence_length = sequence_length.unsqueeze(-1).float()# 根据 sequence_length 生成 mask 矩阵max_len = sequence_output.size(1)mask = torch.arange(max_len, device=sequence_output.device).unsqueeze(0) < sequence_lengthmask = mask.float().unsqueeze(-1)# 对序列中 padding 部分进行 masksequence_output = sequence_output * mask# 对序列中的向量取均值batch_mean_hidden = sequence_output.sum(dim=1) / sequence_lengthreturn batch_mean_hiddenclass Model_BiLSTM_FC(nn.Module):def __init__(self, num_embeddings, input_size, hidden_size, num_classes=2):super(Model_BiLSTM_FC, self).__init__()# 词典大小self.num_embeddings = num_embeddings# 单词向量的维度self.input_size = input_size# LSTM隐藏单元数量self.hidden_size = hidden_size# 情感分类类别数量self.num_classes = num_classes# 实例化嵌入层self.embedding_layer = nn.Embedding(num_embeddings, input_size, padding_idx=0)# 实例化LSTM层# PyTorch中LSTM支持双向设置bidirectional=Trueself.lstm_layer = nn.LSTM(input_size, hidden_size, bidirectional=True, batch_first=True)# 实例化聚合层self.average_layer = AveragePooling()# 实例化输出层self.output_layer = nn.Linear(hidden_size * 2, num_classes)def forward(self, inputs):# 对模型输入拆分为序列数据和mask,序列数据的大小是batch_size*seq_leninput_ids, sequence_length = inputs# 获取词向量 嵌入之后的大小变为batch_size*seq_len*embedding_diminputs_emb = self.embedding_layer(input_ids)# 使用LSTM处理数据,LSTM的输出是 (batch_size, seq_len, 2 * hidden_size)sequence_output, _ = self.lstm_layer(inputs_emb)# 使用聚合层聚合sequence_output ,聚合得到的大小是batch_size*hidden_size*2batch_mean_hidden = self.average_layer(sequence_output, sequence_length)# 输出文本分类logitslogits = self.output_layer(batch_mean_hidden)return logits

3.模型训练

首先定义模型中需要的参数,分别是词表长度num_beddings,输入维度和隐层维度。然后实例化模型为model,损失函数使用交叉熵损失,优化器使用Adam。

Runner类直接导入之前的Runner类,实例化runner类。

调用runner.train函数开始训练,记录每个epoch在训练集和测试集上损失的变化,同时保存最优模型。

代码:

#模型训练
# 设置随机种子
np.random.seed(0)          # 设置NumPy随机种子
random.seed(0)             # 设置Python内置随机库的种子
torch.manual_seed(0)       # 设置CPU的随机种子
# 指定训练轮次
num_epochs = 3
# 指定学习率
learning_rate = 0.001
# 指定embedding的数量为词表长度
num_embeddings = len(word2id_dict)
# embedding向量的维度
input_size = 256
# LSTM网络隐状态向量的维度
hidden_size = 256# 实例化模型
model = Model_BiLSTM_FC(num_embeddings, input_size, hidden_size)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)# 实例化Runner
runner = Runner(model, train_loader, dev_loader, test_loader, criterion, optimizer)
# 模型训练
start_time = time.time()
runner.train(epochs=600)  # 训练模型
end_time = time.time()

4.模型评价

调用runner.test函数进行模型测试,得到在最优模型上的准确率。

代码:

runner.test()  # 测试模型

5.模型预测

定义一个电影评价为text,将文本按照词表转换为索引,设置最大长度为max_seq_len,将其进行截断,利用len函数获得长度,然后对样本添加batch维度。然后将样本和样本长度传入到predict函数,返回值是在两个类别上的logits,然后利用argmax函数获得最大值索引,然后将索引根据映射字典id2label = {0: "消极情绪", 1: "积极情绪"}转换为标签输出。

代码:

# 假设已有的映射字典
id2label = {0: "消极情绪", 1: "积极情绪"}
max_seq_len = 128  # 最大序列长度# 输入的文本
text = "this movie is so great. I watched it three times already"# 处理单条文本
sentence = text.split(" ")
words = [word2id_dict[word] if word in word2id_dict else word2id_dict['[UNK]'] for word in sentence]
words = words[:max_seq_len]  # 截断到最大长度
sequence_length = torch.tensor([len(words)], dtype=torch.int64)
words = torch.tensor(words, dtype=torch.int64).unsqueeze(0)  # 添加 batch 维度# 模型预测(假设 runner.predict 在 PyTorch 中实现为 model.forward)
# 假设 model 是你的 PyTorch 模型
logits = runner.predict((words, sequence_length))  # 调用模型的 forward 方法
max_label_id = torch.argmax(logits, dim=-1).item()  # 获取最大值索引
pred_label = id2label[max_label_id]  # 转换为对应标签print("Label: ", pred_label)

三 基于双向LSTM和注意力机制的文本分类

和(一)只有模型构建中的聚合层不同,其余均一样。

模型构建

(1)计算注意力得分

(1)计算注意力得分

①使用加性模型计算注意力分数

编写函数AdditiveScore实现使用加性模型计算了注意力分数。

公式为:s\left( X,q \right) =v^T\tan\text{h}\left( XW+q^TU \right)

XW这种特征矩阵和参数矩阵的乘积可以直接使用全连接层实现。

init函数:首先定义了三个全连接函数分别代表W、U、v,其中W的输入大小是hidden_size,输出是hidden_size;U的输入是hidden_size,输出是hidden_size;v的输入是hidden_size,输出是1。然后使用torch.rand随机初始化一个大小为hidden_size*1的查询向量。

forward函数:对于输入input,利用全连接层构造公式得到注意力得分score,然后利用.squeeeze(-1)将形状变为batch_size*seq_len,最终函数返回的结果为注意力得分score。

代码:

class AdditiveScore(nn.Module):def __init__(self, hidden_size):super(AdditiveScore, self).__init__()self.fc_W = nn.Linear(hidden_size, hidden_size, bias=False)self.fc_U = nn.Linear(hidden_size, hidden_size, bias=False)self.fc_v = nn.Linear(hidden_size, 1, bias=False)# 查询向量使用均匀分布随机初始化self.q = nn.Parameter(torch.rand(1, hidden_size) * 0.5 - 0.25)  # Uniform initialization [-0.5, 0.5]def forward(self, inputs):"""输入:inputs: [batch_size, seq_len, hidden_size]输出:scores: [batch_size, seq_len]"""batch_size, seq_len, hidden_size = inputs.shape# 计算加性注意力得分scores = torch.tanh(self.fc_W(inputs) + self.fc_U(self.q))# 将 scores 映射到一个标量,输出形状为 [batch_size, seq_len]scores = self.fc_v(scores).squeeze(-1)return scores

②使用点积模型计算注意力分数

编写函数DotProductScore实现使用点积模型计算注意力分数。

init函数:使用torch.rand随机初始化一个大小为hidden_size*1的查询向量q。

forward函数:处理的input是lstm的输出,大小为batch_szie*seq_len*hidden_size*2,利用shape函数获得input的形状,然后调用torch.matmul函数计算input和查询向量q的矩阵乘法,得到注意力得分score,形状为batch_size*seq_len*1,然后利用.squeeeze(-1)将形状变为batch_size*seq_len,最终函数返回的结果为注意力得分score。

代码:
 

class DotProductScore(nn.Module):def __init__(self, hidden_size):super(DotProductScore, self).__init__()# 使用均匀分布随机初始化一个查询向量self.q = nn.Parameter(torch.rand(hidden_size, 1) * 0.5 - 0.25)  # Uniform initialization [-0.5, 0.5]def forward(self, inputs):"""输入:inputs: [batch_size, seq_len, hidden_size]输出:scores: [batch_size, seq_len]"""batch_size, seq_len, hidden_size = inputs.shape# 计算点积得分: scores shape: [batch_size, seq_len, 1]scores = torch.matmul(inputs, self.q)# 将 scores 转换为 [batch_size, seq_len]scores = scores.squeeze(-1)return scores

(2)计算注意分布

注意力得分的大小是batch_size*seq_len,但是seq_len不一定就是文本的真实长度,可能句子是“this movie is nice”,经过填充变为了“this movie is nice pad pad”,转换为数字是[1,2,3,4,0,0],计算得到的注意力得分是0.31750774 0.52375913 0.81493020 0.84624285 0.84624285 0.76624285如果直接对其进行softmax是不合理的,因为pad是对于我们判断清晰分类是没有意义的,所以有一个方法是注意力得分score与掩码矩阵相加,掩码矩阵为0 0 0 0 -1e9 -1e9 相加之后计算softmax得到的结果就是0.17952277 0.22064464 0.2952211 0.30461147 0. 0我们可以发现对于pad的注意力得分是0,在后面进行加权求和的时候就可以忽略掉pad了。

具体实现为:首先利用torch.arange函数生成长度是seq_len的列表arrange,例如seq_len=4, arrange 变为 [0, 1, 2, 3],然后利用arrange < valid_lens生成一个掩码mask,对于每个序列,位置索引小于有效长度的位置会被标记为True(表示有效),其余位置为False(表示无效,即填充部分)。生成一个与scores形状相同的张量y,所有元素都被初始化为-1e9(负无穷大)。最后利用torch.where函数,里面的参数分别是mask、scores和y,根据mask的值选择scores或y中的元素。对于mask为True的位置,保持原scores的值;对于mask为False的位置,用-1e9替换。这样,无效部分的分数就变成了负无穷大。
最后利用就可以对转换之后的score计算softmax,得到的结果为每个时间步的权重,大小是batch_size*seq_len。

代码:

#计算注意力分布# arrange: [1, seq_len], 比如 seq_len=4, arrange 变为 [0, 1, 2, 3]arrange = torch.arange(scores.shape[1], dtype=torch.float32, device=scores.device).unsqueeze(0)# valid_lens : [batch_size, 1]valid_lens = valid_lens.unsqueeze(1)# mask [batch_size, seq_len]mask = arrange < valid_lens# 设定mask为无效部分的分数为负无穷y = torch.full(scores.shape, float('-1e9'), dtype=scores.dtype, device=scores.device)# 使用mask处理scoresscores = torch.where(mask, scores, y)# attn_weights: [batch_size, seq_len]attn_weights = F.softmax(scores, dim=-1)self._attention_weights = attn_weights

(3)加权求和

利用批量矩阵乘法函数torch.bmm函数进行计算注意力得分score和LSTM输出X的加权求和,结果的大小是batch_size*hidden_size。

代码:

#加权求和# context: [batch_size, 1, hidden_size]context = torch.bmm(attn_weights.unsqueeze(1), X)# context: [batch_size, hidden_size]context = context.squeeze(1)

(4)注意力机制完整代码

创建了Attention类,接受的参数是hidden_size和use_additive,其中use_additive=True表示使用加性模型,use_additive=False表示使用点积模型。

在init函数中,首先定义一个变量use_additive接受参数,然后通过判断use_additive是true还是false,进而定义变量scores等于函数AdditiveScore或函数DotProductScore。

forward函数中首先利用函数score计算注意力分布,然后通过(2)计算注意力分布,最后通过(3)进行加权求和。

代码:

class Attention(nn.Module):def __init__(self, hidden_size, use_additive=False):super(Attention, self).__init__()self.use_additive = use_additive# 使用加性模型或者点积模型if self.use_additive:self.scores = AdditiveScore(hidden_size)else:self.scores = DotProductScore(hidden_size)self._attention_weights = Nonedef forward(self, X, valid_lens):"""输入:- X:输入矩阵,shape=[batch_size, seq_len, hidden_size]- valid_lens:长度矩阵,shape=[batch_size]输出:- context:输出矩阵,表示的是注意力的加权平均的结果"""# 计算注意力得分: [batch_size, seq_len]scores = self.scores(X)#计算注意力分布# arrange: [1, seq_len], 比如 seq_len=4, arrange 变为 [0, 1, 2, 3]arrange = torch.arange(scores.shape[1], dtype=torch.float32, device=scores.device).unsqueeze(0)# valid_lens : [batch_size, 1]valid_lens = valid_lens.unsqueeze(1)# mask [batch_size, seq_len]mask = arrange < valid_lens# 设定mask为无效部分的分数为负无穷y = torch.full(scores.shape, float('-1e9'), dtype=scores.dtype, device=scores.device)# 使用mask处理scoresscores = torch.where(mask, scores, y)# attn_weights: [batch_size, seq_len]attn_weights = F.softmax(scores, dim=-1)self._attention_weights = attn_weights#加权求和# context: [batch_size, 1, hidden_size]context = torch.bmm(attn_weights.unsqueeze(1), X)# context: [batch_size, hidden_size]context = context.squeeze(1)return context@propertydef attention_weights(self):return self._attention_weights

(5)注意力机制加到文本情感分类问题

将第一个实验进行文本情感分类的模型中聚合层替换为注意力模型Attention即可。

代码:

class Model_LSTMAttention(nn.Module):def __init__(self,hidden_size,embedding_size,vocab_size,n_classes=10,n_layers=1,use_additive=False,):super(Model_LSTMAttention, self).__init__()self.hidden_size = hidden_sizeself.embedding_size = embedding_sizeself.vocab_size = vocab_sizeself.n_classes = n_classesself.n_layers = n_layers# Define embedding layerself.embedding = nn.Embedding(self.vocab_size, self.embedding_size)# Define LSTMself.lstm = nn.LSTM(input_size=self.embedding_size,hidden_size=self.hidden_size,num_layers=self.n_layers,bidirectional=True,batch_first=True,)# Define Attention layerself.attention = Attention(self.hidden_size * 2, use_additive)# Define classification layerself.cls_fc = nn.Linear(self.hidden_size * 2, self.n_classes)def forward(self, inputs):input_ids, valid_lens = inputsbatch_size = input_ids.size(0)# Get word embeddingsembedded_input = self.embedding(input_ids)# Get LSTM outputlstm_output, (last_hiddens, last_cells) = self.lstm(embedded_input)# Apply Attention mechanismlstm_output = self.attention(lstm_output, valid_lens)# Get logits (raw class predictions)logits = self.cls_fc(lstm_output)return logits

完整代码:

# 数据路径
import os
import random
import timeimport numpy as np
import torch
from torch import optim
from torch.utils.data import Dataset, DataLoader
from collections import defaultdict
from runner import Runner
import torch
import torch.nn as nn
import torch.nn.functional as F
data_dir = 'aclImdb'
train_dir = os.path.join(data_dir, 'train')
test_dir = os.path.join(data_dir, 'test')# 每个类别文件夹下都包含多个 txt 文件
def load_data_from_folder(folder):data = []for label, subfolder in enumerate(['neg', 'pos']):folder_path = os.path.join(folder, subfolder)for filename in os.listdir(folder_path):file_path = os.path.join(folder_path, filename)with open(file_path, 'r', encoding='utf-8') as file:text = file.read()data.append((text, str(label)))  # 合并文本和标签,格式化为 ("文本", '标签')return data# 读取训练集
train_data = load_data_from_folder(train_dir)
# 读取测试集
test_data = load_data_from_folder(test_dir)# 将测试集分割为验证集和测试集
dev_size = len(test_data) // 2
dev_data, test_data = test_data[:dev_size], test_data[dev_size:]# 查看结果
print(f"训练集大小: {len(train_data)}")
print(f"验证集大小: {len(dev_data)}")
print(f"测试集大小: {len(test_data)}")# 自定义Dataset类
class IMDBDataset(Dataset):def __init__(self, examples, word2id_dict):""":param examples: [(text, label), ...] 格式的数据:param word2id_dict: 词汇表,词汇 -> 索引ID"""self.word2id_dict = word2id_dictself.examples = self.words_to_id(examples)def words_to_id(self, examples):"""将文本中的词转换为词典中的ID,未知词用[UNK]的ID替代"""tmp_examples = []for seq, label in examples:seq = [self.word2id_dict.get(word, self.word2id_dict['[UNK]']) for word in seq.split()]label = int(label)tmp_examples.append((seq, label))return tmp_examplesdef __getitem__(self, idx):return self.examples[idx]def __len__(self):return len(self.examples)# 加载词汇表
def load_vocab(filepath):word2id = {}with open(filepath, 'r', encoding='utf-8') as f:for idx, line in enumerate(f):word = line.strip()word2id[word] = idxreturn word2idword2id_dict = load_vocab("aclImdb/imdb.vocab")# 实例化Dataset
train_set = IMDBDataset(train_data, word2id_dict)
dev_set = IMDBDataset(dev_data, word2id_dict)
test_set = IMDBDataset(test_data, word2id_dict)print('训练集样本数:', len(train_set))
print('样本示例:', train_set[4])
def collate_fn(batch_data, pad_val, max_seq_len=256):""":param batch_data: 批量数据 (seq, label) 组成的列表:param pad_val: 填充值,[PAD]对应的ID:param max_seq_len: 序列最大长度:return: 填充后的序列、序列真实长度和标签"""seqs, labels, seq_lens = [], [], []for seq, label in batch_data:# 截断序列seq = seq[:max_seq_len]seq_lens.append(len(seq))seqs.append(seq)labels.append(label)# 获取当前批次的最大长度max_len = max(seq_lens)# 填充序列padded_seqs = [seq + [pad_val] * (max_len - len(seq)) for seq in seqs]# 转换为Tensorpadded_seqs = torch.tensor(padded_seqs, dtype=torch.long)seq_lens = torch.tensor(seq_lens, dtype=torch.long)labels = torch.tensor(labels, dtype=torch.long)return (padded_seqs, seq_lens), labels
from functools import partial# 设置参数
max_seq_len = 256
batch_size = 128# collate_fn函数
collate_fn = partial(collate_fn, pad_val=word2id_dict["[PAD]"], max_seq_len=max_seq_len)# 构建DataLoader
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, drop_last=False, collate_fn=collate_fn)
dev_loader = DataLoader(dev_set, batch_size=batch_size, shuffle=False, drop_last=False, collate_fn=collate_fn)
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False, drop_last=False, collate_fn=collate_fn)# 测试DataLoader
for batch_idx, ((seqs, seq_lens), labels) in enumerate(train_loader):print(f"批次 {batch_idx + 1}:")print("序列: ", seqs)print("序列长度: ", seq_lens)print("标签: ", labels)break  # 只测试一个批次#模型构建
import torch
import torch.nn as nnimport torch
import torch.nn as nnclass AdditiveScore(nn.Module):def __init__(self, hidden_size):super(AdditiveScore, self).__init__()self.fc_W = nn.Linear(hidden_size, hidden_size, bias=False)self.fc_U = nn.Linear(hidden_size, hidden_size, bias=False)self.fc_v = nn.Linear(hidden_size, 1, bias=False)# 查询向量使用均匀分布随机初始化self.q = nn.Parameter(torch.rand(1, hidden_size) * 0.5 - 0.25)  # Uniform initialization [-0.5, 0.5]def forward(self, inputs):"""输入:inputs: [batch_size, seq_len, hidden_size]输出:scores: [batch_size, seq_len]"""batch_size, seq_len, hidden_size = inputs.shape# 计算加性注意力得分scores = torch.tanh(self.fc_W(inputs) + self.fc_U(self.q))# 将 scores 映射到一个标量,输出形状为 [batch_size, seq_len]scores = self.fc_v(scores).squeeze(-1)return scoresimport torch
import torch.nn as nnclass DotProductScore(nn.Module):def __init__(self, hidden_size):super(DotProductScore, self).__init__()# 使用均匀分布随机初始化一个查询向量self.q = nn.Parameter(torch.rand(hidden_size, 1) * 0.5 - 0.25)  # Uniform initialization [-0.5, 0.5]def forward(self, inputs):"""输入:inputs: [batch_size, seq_len, hidden_size]输出:scores: [batch_size, seq_len]"""batch_size, seq_len, hidden_size = inputs.shape# 计算点积得分: scores shape: [batch_size, seq_len, 1]scores = torch.matmul(inputs, self.q)# 将 scores 转换为 [batch_size, seq_len]scores = scores.squeeze(-1)return scoresclass Attention(nn.Module):def __init__(self, hidden_size, use_additive=False):super(Attention, self).__init__()self.use_additive = use_additive# 使用加性模型或者点积模型if self.use_additive:self.scores = AdditiveScore(hidden_size)else:self.scores = DotProductScore(hidden_size)self._attention_weights = Nonedef forward(self, X, valid_lens):"""输入:- X:输入矩阵,shape=[batch_size, seq_len, hidden_size]- valid_lens:长度矩阵,shape=[batch_size]输出:- context:输出矩阵,表示的是注意力的加权平均的结果"""# 计算注意力得分: [batch_size, seq_len]scores = self.scores(X)#计算注意力分布# arrange: [1, seq_len], 比如 seq_len=4, arrange 变为 [0, 1, 2, 3]arrange = torch.arange(scores.shape[1], dtype=torch.float32, device=scores.device).unsqueeze(0)# valid_lens : [batch_size, 1]valid_lens = valid_lens.unsqueeze(1)# mask [batch_size, seq_len]mask = arrange < valid_lens# 设定mask为无效部分的分数为负无穷y = torch.full(scores.shape, float('-1e9'), dtype=scores.dtype, device=scores.device)# 使用mask处理scoresscores = torch.where(mask, scores, y)# attn_weights: [batch_size, seq_len]attn_weights = F.softmax(scores, dim=-1)self._attention_weights = attn_weights#加权求和# context: [batch_size, 1, hidden_size]context = torch.bmm(attn_weights.unsqueeze(1), X)# context: [batch_size, hidden_size]context = context.squeeze(1)return context@propertydef attention_weights(self):return self._attention_weightsclass Model_LSTMAttention(nn.Module):def __init__(self,hidden_size,embedding_size,vocab_size,n_classes=10,n_layers=1,use_additive=False,):super(Model_LSTMAttention, self).__init__()self.hidden_size = hidden_sizeself.embedding_size = embedding_sizeself.vocab_size = vocab_sizeself.n_classes = n_classesself.n_layers = n_layers# Define embedding layerself.embedding = nn.Embedding(self.vocab_size, self.embedding_size)# Define LSTMself.lstm = nn.LSTM(input_size=self.embedding_size,hidden_size=self.hidden_size,num_layers=self.n_layers,bidirectional=True,batch_first=True,)# Define Attention layerself.attention = Attention(self.hidden_size * 2, use_additive)# Define classification layerself.cls_fc = nn.Linear(self.hidden_size * 2, self.n_classes)def forward(self, inputs):input_ids, valid_lens = inputsbatch_size = input_ids.size(0)# Get word embeddingsembedded_input = self.embedding(input_ids)# Get LSTM outputlstm_output, (last_hiddens, last_cells) = self.lstm(embedded_input)# Apply Attention mechanismlstm_output = self.attention(lstm_output, valid_lens)# Get logits (raw class predictions)logits = self.cls_fc(lstm_output)return logits#模型训练
# 设置随机种子
np.random.seed(0)          # 设置NumPy随机种子
random.seed(0)             # 设置Python内置随机库的种子
torch.manual_seed(0)       # 设置CPU的随机种子
# 指定训练轮次
num_epochs = 3
# 指定学习率
learning_rate = 0.001
# 指定embedding的数量为词表长度
num_embeddings = len(word2id_dict)
# embedding向量的维度
input_size = 256
# LSTM网络隐状态向量的维度
hidden_size = 256# 实例化模型
model = Model_LSTMAttention(num_embeddings, input_size, hidden_size)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)# 实例化Runner
runner = Runner(model, train_loader, dev_loader, test_loader, criterion, optimizer)
# 模型训练
start_time = time.time()
runner.train(epochs=600)  # 训练模型
end_time = time.time()
runner.test()  # 测试模型import torch
import torch.nn.functional as F# 假设已有的映射字典
id2label = {0: "消极情绪", 1: "积极情绪"}
max_seq_len = 128  # 最大序列长度# 输入的文本
text = "this movie is so great. I watched it three times already"# 处理单条文本
sentence = text.split(" ")
words = [word2id_dict[word] if word in word2id_dict else word2id_dict['[UNK]'] for word in sentence]
words = words[:max_seq_len]  # 截断到最大长度
sequence_length = torch.tensor([len(words)], dtype=torch.int64)
words = torch.tensor(words, dtype=torch.int64).unsqueeze(0)  # 添加 batch 维度# 模型预测(假设 runner.predict 在 PyTorch 中实现为 model.forward)
# 假设 model 是你的 PyTorch 模型
logits = runner.predict((words, sequence_length))  # 调用模型的 forward 方法
max_label_id = torch.argmax(logits, dim=-1).item()  # 获取最大值索引
pred_label = id2label[max_label_id]  # 转换为对应标签print("Label: ", pred_label)

四 实验结果与结论

(一)基于双向LSTM模型完成文本分类任务

训练集和验证集上损失变化:

测试集上准确率和预测结果:

(二)基于双向LSTM和注意力机制的文本分类

①使用加性模型

在训练集和验证集上损失变化:

在测试集上准确率和模型预测结果:

②使用点积模型

在训练集和验证集上损失变化:

在测试集上准确率和模型预测结果:

不使用注意力得到的准确率是0.8390,使用注意力时,使用加性模型得到的准确率是0.8415,使用点积模型得到的准确率是0.8472。相比于不加注意力机制的模型,加入注意力机制的模型效果会更好些。同时本次实验结果来看,点积模型的表现比加性模型的变现好。

五 总结和心得体会

在这个实验中,我研究了一下datasetdataloader,dataset是和dataloader一起使用的。当我们拿到了数据集比如是train_data,首先需要将train_data放入自定义的dataset函数,这里面必须包含三个函数,分别是init函数,getitem函数和len函数。下面是一个实例:
from torch.utils.data import Dataset

import torch

class MyDataset(Dataset):

    def __init__(self, data):

        self.data = data

    def __len__(self):

        return len(self.data)

    def __getitem__(self, idx):

        # 假设 data 是一个包含文本和标签的元组

        text, label = self.data[idx]

        # 进行必要的数据处理

        return torch.tensor(text), torch.tensor(label)

init函数定义了变量data赋值为传入的数据data,len函数负责返回数据的长度,我认为getitem函数是主要的,它返回了data中某个索引对应的内容,因为dataset的主要职责是定义如何获取数据集中的一个样本。上面是一个简单的例子,没有对data的其他处理。拿今天这个实验来说,需要将train_data中的文本全部转换为数字,所以在自定义的dataset函数里面还需要加入一个words_to_id用于将文本转换为数字。

那init函数需要修改为:

examples是传入的data,需要对其进行word_to_id,将文本按照词汇表转换为数字索引,那getitem函数返回的就是修改之后的data的某个索引的数据了。

总结一下就是如果不需要对data进行修改,直接在init函数中定义变量data被赋值为转入的data,然后定义getitem方法用于返回某个idx的内容,以及定义len方法用于返回data的长度。

如果需要对数据进行一些操作,就需要在自定义的dataset函数中定义修改函数对data进行修改,为了方便getitem返回修改后的数据,在init函数中定义一个变量被赋值为修改之后的data。

Dataloader用于将 Dataset 中的样本批量化,就是直接使用函数Dataloader,里面传入的第一个参数就是dataset,剩下的还有batch_size,shuffle(是否对数据打乱)等。在这个实验中,创建Dataloader时还传入了一个函数collate_fn,如果不传递参数,Dataloader会使用默认的行为合并数据,就是将每个样本堆叠成一个批次;如果需要对加载的批次数据进行更复杂的操作,就可以定义一个函数,传递给Dataloader,这样在加载数据时就会调用这个函数来进行数据聚合。

Pytorch定义的嵌入函数nn.Embedding密集向量嵌入(dense vector embeddings)。nn.Embedding 是一个查找表,它将每个输入的索引映射到一个对应的连续空间中的低维向量。这些向量通常是通过训练学习得到的,而不是像 one-hot 编码那样使用稀疏的高维向量。

输入 输入的是一个 整数索引,通常是词汇表中单词的索引,形状为 [batch_size, seq_len]。

输出: 输出是对应索引的嵌入向量,形状为 [batch_size, seq_len, embedding_dim],其中 embedding_dim 是你在初始化 Embedding 时设定的向量维度。

举个例子:
embedding = nn.Embedding(5, 3, padding_idx=0) #词表大小、嵌入维度

input_ids = torch.tensor([[1, 2, 3], [4, 1, 0]]) #输入batch_size*seq_len

embedding_output = embedding(input_ids)

输出:

输出大小变为batch_size*seq_len*embedding_dim,我理解的就是batch_size为2就是有两个句子;seq_len为3就是每个句子都由三个单词组成,例如“movie is nice”;embedding_dim为3就是每个单词对应的索引都被扩充成了三维的向量,因为LSTM每个时间步的输入都应该是向量。

这个实验断续写了好几天,周六一下午把第一个实验的模型跑出来了,但是当时不理解聚合层是干啥呢,对于嵌入层的具体实现也不是很理解,更不知道为啥聚合层的时候还要掩码。然后是周日上午,我把注意力的代码整出来了,但是也没怎么分析注意力层。其实我之前就在B站自学过transformer和注意力,但是我不知道这个注意力kqv计算出来时干啥的。整完注意力代码,我就又开始研究模型的嵌入层,经过每个层的输出大小是什么,然后突然就开窍了,理解了整个模型的运作。

首先把文本根据词表转换为数字,然后把数字嵌入,但是嵌入之前需要对每个批次的数据进行截断和加长,使得他们的长度是一致的。嵌入之前的大小是batch_size*seq_len,seq_len就是每个句子单词的个数,嵌入之后的大小是batch_size*seq_len*embedding_dim,为啥要把每个数字加维度呢,因为模型处理的是向量。然后将嵌入之后放入LSTM模型,正反两个方向得到的大小是batch_size*seq_len*hidden_size*2,每个单词的维度发生了变化。接着放到聚合层,沿着时间步seq_len的方向将每个单词的向量进行加和求平均值,得到的大小变为了batch_size*hidden_size*2,但是在进行求和的时候,每个时间步也就是每个单词的权重是一样的,但是对于情感分类问题来说这是不合理的,因为有的单词对于判别情感来说是很重要的,但是有的单词却没有用。最后是一个全连接函数,输入维度是hidden_size*2,输出维度是2。

针对聚合层直接求平均出现的问题,后来引入了注意力层,batch_size*seq_len*hidden*2进行聚合的时候,我们考虑每个时间步也就是每个单词的重要性,这个要怎么考虑呢,首先引入一个查询向量q,q和每个时间步计算注意力分数,我比较喜欢点积模型,比较简单,直接q和每个时间步相乘即可,然后得到的值为每个时间步的注意力分数。然后需要经过softmax操作,将注意力分数转换为注意力分布,也就是每个时间步的权重,但是softmax之前需要对时间步进行掩码,将每个时间步的注意力得分加上掩码,因为有的时间步可能对应的单词是PAD,那他就不应该占有权重,其对应的掩码应该是负无穷,这样经过softmax就会变为0。最后进行加权求和,将掩码之后的注意力分布乘以每个时间步,这样得到的结果就是考虑了不同单词重要性的结果。然后将其全连接层,进行分类


http://www.ppmy.cn/ops/143991.html

相关文章

Kafka-创建topic源码

一、命令创建topic kafka-topics --create --topic quickstart-events --bootstrap-server cdh1:9092 --partitions 2 --replication-factor 2 二、kafka-topics脚本 exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.TopicCommand "$" 脚本中指定了…

高中数学刷题版:集合与函数概念-函数的概念[笔记总结-干货]

文章目录 一、题型归纳二、求函数值域方法总结三、方法使用归纳四、高于二次整式处理办法五、例题 一、题型归纳 1、求函数定义域 比较难的就是抽象函数的定义域问题 2、求函数值域 这个相对来讲比较复杂&#xff0c;也是考试重点 另外我们要明确&#xff0c;在高中的函数题…

全志H618 Android12修改doucmentsui功能菜单项

背景: 由于当前的文件管理器在我们的产品定义当中,某些界面有改动的需求,所以需要在Android12 rom中进行定制以符合当前产品定义。 需求: 在进入File文件管理器后,查看...功能菜单时,有不需要的功能菜单,需要隐藏,如:新建窗口、不显示的文件夹、故代码分析以及客制…

基于python实现用户注册与校验

实现用户注册与校验 common—源码 """ 一、基础功能需求1. 综合使用当前所学知识&#xff0c;如列表、字典、字符串、函数等&#xff0c;实现一个基于命令行的用户注册功能。 2. 用户注册时必须要输入用户名、密码、手机号码&#xff0c;用针对用户的输入必须…

命令行音乐库管理工具Beets

什么是 Beets &#xff1f; Beets 是一个音乐库管理系统和音乐文件元数据标签编辑器。它使用 MusicBrainz 数据库来自动填充音乐文件的元数据信息&#xff0c;并且可以通过插件系统来增加各种额外功能&#xff0c;比如自动下载专辑封面&#xff0c;歌词等。其目的是使您的音乐收…

只需3步,使用Stable Diffusion无限生成AI数字人视频

效果演示 先看效果&#xff0c;感兴趣的可以继续读下去。 没有找到可以上传视频的地方&#xff0c;大家打开这个链接可以看到&#xff1a;www.aliyundrive.com/s/CRBm5NL3x… 基本方法 搞一张照片&#xff0c;搞一段语音&#xff0c;合成照片和语音&#xff0c;同时让照片中…

计算机网络面经总结

目录 Tcp三次握手 为什么要三次握手&#xff1f; 半连接队列和全连接队列 三次握手可以携带数据吗&#xff1f; TCP四次挥手 为什么不能把服务端发送的ACK和FIN合并&#xff0c;变成三次挥手&#xff1f; 第二次挥手时服务端ACK没有送达客户端会怎么样&#xff1f; 为什…

【开源免费】基于SpringBoot+Vue.JS在线宠物用品交易网站(JAVA毕业设计)

本文项目编号 T 092 &#xff0c;文末自助获取源码 \color{red}{T092&#xff0c;文末自助获取源码} T092&#xff0c;文末自助获取源码 目录 一、系统介绍二、数据库设计三、配套教程3.1 启动教程3.2 讲解视频3.3 二次开发教程 四、功能截图五、文案资料5.1 选题背景5.2 国内…