139-Twitter评论情绪基础RNN模型分类
143-LSTM文本分类模型
【参考文档】17-3Twitter评论情绪分类.ipynb
【导出代码】
# %% [markdown]
# # 139-Twitter评论情绪分类# %% [markdown]
# ## 数据读取处理# %%
import torch
import torchtext
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import pandas as pd# %%
data = pd.read_csv('Tweets.csv')# %%
data.head()# %% [markdown]
# 取两列:评价,文本# %%
data = data[['airline_sentiment', 'text']]# %%
data# %% [markdown]
# 查看评价唯一值# %%
data.airline_sentiment.unique()# %%
data.info() #【查看缺失值】# %%
data.duplicated().sum() #【查看重复值】# %%
data.drop_duplicates(inplace=True) #【去掉重复值】# %%
data.airline_sentiment.value_counts() #【查看各种类数量】# %% [markdown]
# 情绪改为0,1,2进行编码# %%
label = pd.factorize(data.airline_sentiment)[0]# %% [markdown]
# 简化文本:0-转化为小写、1-去掉特殊符号# %%
data.text# %%
import re
pat = re.compile('[A-Za-z]+')# %%
#【文本处理函数】
def pre_text(text):text = pat.findall(text) #【提取所有英文】text = [w.lower() for w in text] #【转化为小写】return text # %%
x = data.text.apply(pre_text) #【应用函数】# %%
x# %% [markdown]
# ## 创建词表
# vocab:每个单词创建一个序号# %%
word_set = set()
for t in x: #【x:文本列表集合,t:每条文本列表】for word in t:word_set.add(word)# %%
word_set #【所有的唯一单词】# %%
max_word = len(word_set)+1# %%
word_list = list(word_set) #【字典转列表】# %%
word_list.index('you')# %%
word_index = dict((w, word_list.index(w)+1) for w in word_list) #【列表推导式,直接输出字典】# %%
word_index# %%
x = x.apply(lambda t: [word_index.get(w, 0) for w in t]) #【将单词w从文本t取出,根据index转化为编码,最后应用导x上】# %%
x# %%
max_len = max(len(t) for t in x)# %%
max_len# %%
pad_x = [t + (max_len-len(t))*[0] for t in x] #【填充文本长度到最大:最大长度-当前长度*列表[0],最后加到当前文本t后】# %%
pad_x = np.array(pad_x) #【转化列表】# %%
pad_x.shape #【14452条,长度都是34】# %%
label.shape# %% [markdown]
# ## 划分训练测试数据# %% [markdown]
# pip install sklearn:机器学习库# %%
from sklearn.model_selection import train_test_split# %%
x_train, x_test, y_train, y_test = train_test_split(pad_x, label)# %%
x_train.shape, x_test.shape# %% [markdown]
# 创建DataSet类# %%
class Mydataset(torch.utils.data.Dataset):def __init__(self, text_array, label_array):self.text_array = text_arrayself.label_array = label_arraydef __getitem__(self, index):text = torch.LongTensor(self.text_array[index])label = self.label_array[index]return text, labeldef __len__(self):return len(self.label_array)# %%
train_ds = Mydataset(x_train, y_train)
test_ds = Mydataset(x_test, y_test)# %%
BATCH_SIZE = 32# %%
train_dl = torch.utils.data.DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True)
test_dl = torch.utils.data.DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=False)# %% [markdown]
# ## 基础文本分类模型# %%
embedding_dim = 100# %%
#【没用考虑时序关系,只考虑了单句话】
class Basic_Net(nn.Module):def __init__(self):super(Basic_Net, self).__init__()#【嵌入层】self.embedding = nn.Embedding(num_embeddings=max_word, embedding_dim=embedding_dim) #【文本标记为enmbedding——dim张量】#【Linear层】self.fc1 = nn.Linear(max_len*100, 1024)#【输出层】self.fc2 = nn.Linear(1024, 3)def forward(self, x):x = self.embedding(x)x = x.view(x.size(0), -1) #【将embedding——dim张量展开为1维张量】x = F.relu(self.fc1(x))x = self.fc2(x) #【输出层:不需要激活函数】return x# %%
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Basic_Net().to(device)# %%
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)# %%
def fit(epoch, model, train_dl, test_dl):crrect = 0total = 0running_loss = 0.0model.train()for x, y in train_dl:x, y = x.to(device), y.to(device)y_pred = model(x)loss = loss_fn(y_pred, y)optimizer.zero_grad()loss.backward()optimizer.step()with torch.no_grad():y_pred = torch.argmax(y_pred, dim=1)crrect += (y_pred == y).sum().item()total += y.size(0)running_loss += loss.item()epoch_loss = running_loss / len(train_dl.dataset)epoch_acc = crrect / totaltest_correct = 0test_total = 0test_running_loss = 0.0model.eval()with torch.no_grad():for x, y in test_dl:x, y = x.to(device), y.to(device)y_pred = model(x)loss = loss_fn(y_pred, y)y_pred = torch.argmax(y_pred, dim=1)test_correct += (y_pred == y).sum().item()test_total += y.size(0)test_running_loss += loss.item()test_loss = test_running_loss / len(test_dl.dataset)test_acc = test_correct / test_totalprint(f"Epoch {epoch+1} loss: {epoch_loss:.4f} acc: {epoch_acc:.4f} | Test loss: {test_loss:.4f} acc: {test_acc:.4f}")return epoch_loss, epoch_acc, test_loss, test_acc# %%
epochs = 10# %%
train_loss = []
train_acc = []
test_loss = []
test_acc = []for epoch in range(epochs):epoch_loss, epoch_acc, epoch_test_loss, epoch_test_acc = fit(epoch, model, train_dl, test_dl)train_loss.append(epoch_loss)train_acc.append(epoch_acc)test_loss.append(epoch_test_loss)test_acc.append(epoch_test_acc)# %% [markdown]
# 严重的过拟合# %% [markdown]
# # 143-LSTM文本分类模型# %%
embedding_dim = 100
hidden_size = 200# %%
class LSTM_Net(nn.Module):def __init__(self,max_word, embedding_dim):super(LSTM_Net, self).__init__()self.embedding = nn.Embedding(max_word, embedding_dim) #【batch * maxlen * embedding_dim】self.lstm = nn.LSTM(embedding_dim, hidden_size, batch_first=True)self.fc1 = nn.Linear(hidden_size, 256)self.fc2 = nn.Linear(256, 3)def forward(self, x):x = self.embedding(x)x, _ = self.lstm(x) #【x --> batch, time_step, output】x = x[:, -1, :]x = F.relu(self.fc1(x))x = self.fc2(x)return x# %%
model = LSTM_Net(max_word, embedding_dim).to(device)# %%
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)def fit(epoch, model, train_dl, test_dl):crrect = 0total = 0running_loss = 0.0model.train()for x, y in train_dl:x, y = x.to(device), y.to(device)y_pred = model(x)loss = loss_fn(y_pred, y)optimizer.zero_grad()loss.backward()optimizer.step()with torch.no_grad():y_pred = torch.argmax(y_pred, dim=1)crrect += (y_pred == y).sum().item()total += y.size(0)running_loss += loss.item()epoch_loss = running_loss / len(train_dl.dataset)epoch_acc = crrect / totaltest_correct = 0test_total = 0test_running_loss = 0.0model.eval()with torch.no_grad():for x, y in test_dl:x, y = x.to(device), y.to(device)y_pred = model(x)loss = loss_fn(y_pred, y)y_pred = torch.argmax(y_pred, dim=1)test_correct += (y_pred == y).sum().item()test_total += y.size(0)test_running_loss += loss.item()test_loss = test_running_loss / len(test_dl.dataset)test_acc = test_correct / test_totalprint(f"Epoch {epoch+1} loss: {epoch_loss:.4f} acc: {epoch_acc:.4f} | Test loss: {test_loss:.4f} acc: {test_acc:.4f}")return epoch_loss, epoch_acc, test_loss, test_accepochs = 10
train_loss = []
train_acc = []
test_loss = []
test_acc = []for epoch in range(epochs):epoch_loss, epoch_acc, epoch_test_loss, epoch_test_acc = fit(epoch, model, train_dl, test_dl)train_loss.append(epoch_loss)train_acc.append(epoch_acc)test_loss.append(epoch_test_loss)test_acc.append(epoch_test_acc)# %%
import matplotlib.pyplot as plt# %%
plt.plot(range(epochs), train_acc, c='r', label='Training Accuracy')
plt.plot(range(epochs), test_acc, c='b', label='Test Accuracy')
plt.title('Training and Test Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()# %%