深度学习实验十一 卷积神经网络(2)——基于LeNet实现手写体数字识别实验

embedded/2024/11/25 22:20:08/

目录

一、数据

二、模型构建

三、模型训练及评价

 四、打印参数量和计算量

五、模型预测

附:完整可运行代码

实验大致步骤:

一、数据

下载网站:MNIST数据集

之前的官网不能下载数据集了,403了,所以找到一个类似的网站去下载

下载后进行读取、预处理和分隔成训练集、验证集、测试集:

# 读取 MNIST 图像数据
def load_images(file_path):with gzip.open(file_path, 'rb') as f:magic_number = int.from_bytes(f.read(4), 'big')  # 读取 magic numbernum_images = int.from_bytes(f.read(4), 'big')  # 图片数量num_rows = int.from_bytes(f.read(4), 'big')  # 图片行数num_cols = int.from_bytes(f.read(4), 'big')  # 图片列数images = np.frombuffer(f.read(), dtype=np.uint8)  # 读取图片数据images = images.reshape(num_images, num_rows, num_cols)  # 重塑为 (num_images, 28, 28)return images# 读取 MNIST 标签数据
def load_labels(file_path):with gzip.open(file_path, 'rb') as f:magic_number = int.from_bytes(f.read(4), 'big')  # 读取 magic numbernum_labels = int.from_bytes(f.read(4), 'big')  # 标签数量labels = np.frombuffer(f.read(), dtype=np.uint8)  # 读取标签数据return labels# 加载数据
images = load_images('./MNIST/raw/train-images-idx3-ubyte.gz')
labels = load_labels('./MNIST/raw/train-labels-idx1-ubyte.gz')
train_images = images[:1000]
train_labels = labels[:1000]
dev_images = images[1000:1200]
dev_labels = labels[1000:1200]
test_images = images[1200:1400]
test_labels = labels[1200:1400]# 打印数据集分布信息
print(f'Length of train/dev/test set: {len(train_images)}/{len(dev_images)}/{len(test_images)}')image, label = train_images[2], train_labels[2]
image, label = np.array(image).astype('float32'), int(label)
# 原始图像数据为长度784的行向量,需要调整为[28,28]大小的图像
image = np.reshape(image, [28, 28])
image = Image.fromarray(image.astype('uint8'), mode='L')
print("The number in the picture is {}".format(label))
plt.figure(figsize=(5, 5))
plt.imshow(image)
plt.show()# 定义训练集、验证集和测试集
train_set = {"images": train_images, "labels": train_labels}
dev_set = {"images": dev_images, "labels": dev_labels}
test_set = {"images": test_images, "labels": test_labels}# 数据预处理
transforms = transforms.Compose([transforms.Resize(32), transforms.ToTensor(), transforms.Normalize(mean=[0.5], std=[0.5])])class MNIST_dataset(Dataset):def __init__(self, dataset, transforms, mode='train'):self.mode = modeself.transforms = transformsself.dataset = datasetdef __getitem__(self, idx):# 从字典中获取图像和标签image, label = self.dataset["images"][idx], self.dataset["labels"][idx]image, label = np.array(image).astype('float32'), int(label)image = np.reshape(image, [28, 28])image = Image.fromarray(image.astype('uint8'), mode='L')image = self.transforms(image)return image, labeldef __len__(self):# 返回图像数量return len(self.dataset["images"])# 加载 mnist 数据集
train_dataset = MNIST_dataset(dataset=train_set, transforms=transforms, mode='train')
test_dataset = MNIST_dataset(dataset=test_set, transforms=transforms, mode='test')
dev_dataset = MNIST_dataset(dataset=dev_set, transforms=transforms, mode='dev')

运行结果:

成功划分数据集


加载第一张图片并显示

二、模型构建

import torch
import torch.nn.functional as F
import torch.nn as nn
from torch.nn.init import constant_, normal_, uniform_
from CNN_op import Conv2D, Pool2Dclass Model_LeNet(nn.Module):def __init__(self, in_channels, num_classes=10):super(Model_LeNet, self).__init__()# 卷积层:输出通道数为6,卷积核大小为5×5self.conv1 = Conv2D(in_channels=in_channels, out_channels=6, kernel_size=5)# 汇聚层:汇聚窗口为2×2,步长为2self.pool2 = Pool2D(size=(2, 2), mode='max', stride=2)# 卷积层:输入通道数为6,输出通道数为16,卷积核大小为5×5,步长为1self.conv3 = Conv2D(in_channels=6, out_channels=16, kernel_size=5, stride=1)# 汇聚层:汇聚窗口为2×2,步长为2self.pool4 = Pool2D(size=(2, 2), mode='avg', stride=2)# 卷积层:输入通道数为16,输出通道数为120,卷积核大小为5×5self.conv5 = Conv2D(in_channels=16, out_channels=120, kernel_size=5, stride=1)# 全连接层:输入神经元为120,输出神经元为84self.linear6 = nn.Linear(120, 84)# 全连接层:输入神经元为84,输出神经元为类别数self.linear7 = nn.Linear(84, num_classes)def forward(self, x):# C1:卷积层+激活函数output = F.relu(self.conv1(x))# S2:汇聚层output = self.pool2(output)# C3:卷积层+激活函数output = F.relu(self.conv3(output))# S4:汇聚层output = self.pool4(output)# C5:卷积层+激活函数output = F.relu(self.conv5(output))# 输入层将数据拉平[B,C,H,W] -> [B,CxHxW]output = torch.squeeze(output, dim=3)output = torch.squeeze(output, dim=2)# F6:全连接层output = F.relu(self.linear6(output))# F7:全连接层output = self.linear7(output)return outputclass PyTorch_LeNet(nn.Module):def __init__(self, in_channels, num_classes=10):super(PyTorch_LeNet, self).__init__()# 卷积层:输出通道数为6,卷积核大小为5*5self.conv1 = nn.Conv2d(in_channels=in_channels, out_channels=6, kernel_size=5)# 汇聚层:汇聚窗口为2*2,步长为2self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)# 卷积层:输入通道数为6,输出通道数为16,卷积核大小为5*5self.conv3 = nn.Conv2d(in_channels=6, out_channels=16, kernel_size=5)# 汇聚层:汇聚窗口为2*2,步长为2self.pool4 = nn.AvgPool2d(kernel_size=2, stride=2)# 卷积层:输入通道数为16,输出通道数为120,卷积核大小为5*5self.conv5 = nn.Conv2d(in_channels=16, out_channels=120, kernel_size=5)# 全连接层:输入神经元为120,输出神经元为84self.linear6 = nn.Linear(in_features=120, out_features=84)# 全连接层:输入神经元为84,输出神经元为类别数self.linear7 = nn.Linear(in_features=84, out_features=num_classes)def forward(self, x):# C1:卷积层+激活函数output = F.relu(self.conv1(x))# S2:汇聚层output = self.pool2(output)# C3:卷积层+激活函数output = F.relu(self.conv3(output))# S4:汇聚层output = self.pool4(output)# C5:卷积层+激活函数output = F.relu(self.conv5(output))# 输入层将数据拉平[B,C,H,W] -> [B,CxHxW]output = torch.squeeze(output, dim=3)output = torch.squeeze(output, dim=2)# F6:全连接层output = F.relu(self.linear6(output))# F7:全连接层output = self.linear7(output)return output# 打印每一层的参数
# 这里用np.random创建一个随机数组作为输入数据
inputs = np.random.randn(*[1, 1, 32, 32])
inputs = inputs.astype('float32')
model = PyTorch_LeNet(in_channels=1, num_classes=10)
c = []
for a, b in model.named_children():c.append(a)
print(c)
x = torch.tensor(inputs)
for a, item in model.named_children():try:x = item(x)except:x = torch.reshape(x, [x.shape[0], -1])x = item(x)d = []e = []for b, c in item.named_parameters():d.append(b)e.append(c)if len(e) == 2:print(a, x.shape, e[0].shape,e[1].shape)else:# 汇聚层没有参数print(a, x.shape)

测试结果:

三、模型训练及评价

代码如下:

# 模型训练
import torch.optim as opt
import torch.utils.data as data
from nndl_3 import RunnerV3, Accuracybatch_size = 16
train_loader = data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
dev_loader = data.DataLoader(dev_dataset, batch_size=batch_size)
test_loader = data.DataLoader(test_dataset, batch_size=batch_size)seed = 300
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)  # 如果使用 GPU,还可以设置 CUDA 的随机种子
torch.backends.cudnn.deterministic = True  # 使得 CUDA 确定性计算
torch.backends.cudnn.benchmark = False     # 防止优化导致不一致# 自己定义的卷积、池化算子进行训练==================================
# print("使用自己定义的算子进行训练:================================")
# time1 = time.time()
# model = Model_LeNet(in_channels=1, num_classes=10)
# # 定义优化器
# lr = 0.1
# optimizer = opt.SGD(lr=lr, params=model.parameters())
# # 定义损失函数
# loss_fn = F.cross_entropy
# # 定义评价指标
# metric = Accuracy(is_logist=True)
# # 实例化 RunnerV3 类,并传入训练配置。
# runner = RunnerV3(model, optimizer, loss_fn, metric)
# # 启动训练
# log_steps = 15
# eval_steps = 15
# runner.train(train_loader, dev_loader, num_epochs=10, log_steps=log_steps,
#              eval_steps=eval_steps, save_path="best_model.pdparams")
# time2 = time.time()
# print("使用自定义算子运行时间:", time2 - time1)
# # 损失可视化
# from nndl_3 import plot
# plot(runner, 'cnn-loss1.pdf')
#
# # 加载最优模型
# runner.load_model('best_model.pdparams')
# # 模型评价
# score, loss = runner.evaluate(test_loader)
# print("[Test] accuracy/loss: {:.4f}/{:.4f}".format(score, loss))
# print("=================================================================")# 使用torch框架算子进行训练======================================
print("使用框架算子进行训练:=======================================")
time3 = time.time()
model = PyTorch_LeNet(in_channels=1, num_classes=10)
# 定义优化器
lr = 0.1
optimizer = opt.SGD(lr=lr, params=model.parameters())
# 定义损失函数
loss_fn = F.cross_entropy
# 定义评价指标
metric = Accuracy(is_logist=True)
# 实例化 RunnerV3 类,并传入训练配置。
runner = RunnerV3(model, optimizer, loss_fn, metric)
# 启动训练
log_steps = 15
eval_steps = 15
runner.train(train_loader, dev_loader, num_epochs=10, log_steps=log_steps,eval_steps=eval_steps, save_path="best_model.pdparams")
time4 = time.time()
print("使用框架算子的运行时间", time4 - time3)
# 损失可视化
from nndl_3 import plot
plot(runner, 'cnn-loss1.pdf')
# 加载最优模型
runner.load_model('best_model.pdparams')
# 模型评价
score, loss = runner.evaluate(test_loader)
print("[Test] accuracy/loss: {:.4f}/{:.4f}".format(score, loss))

训练时间对比:

利用上节实验课自己手搓的Conv2D算子和Pool2D算子进行训练,实在是太慢了,于是使用Pytorch的算子进行训练,发现两者运行时间竟然差了7分钟,可见一个成熟的框架算子速率有多快。

运行结果:

 

注:

实验时,出现了一个意外,就是dev上的loss和score不会随着迭代而发生变化。分析好几遍代码也没有找到问题所在。在之前的前馈神经网络中也出现过这种情况。之前以为是批大小和学习率的问题,后面发现并不是。

 在对手写的卷积算子和框架算子进行对比的时候,不小心将model对定义了一遍,导致产生了如下结构:

虽然在实例化Runner类的时候传进去的model都是实例化PyTorch_LeNet的,但是在重新实例化model后,优化器的参数没有跟上新的模型实例。因为优化器在第一次实例化模型时已经绑定了那个模型的参数,当后面重新实例化一个新的模型时,优化器依然是使用之前绑定的模型的参数,而不是新的模型的参数,所以优化时优化器不会自动更新绑定的模型参数,因此会导致训练过程中模型的权重无法更新,从而出现dev准确率不变的问题。

注:

在训练过程中发现,不仅仅是lr会影响最后的准确率,我自己设置的seed随机种子也是会影响准确率的。

当我给seed值设置为常见的42时,准确率只有85%;但是我给seed值设置为300时,准确率达到了92%。原来并不是设置了随机种子就能每次训练结果都一样,种子的大小也会对训练结果造成一定的影响。虽然一样的seed值能产生一样的结果,但是不同的 seed 值导致了不同的随机初始化、数据打乱和优化器行为,这些都会影响模型的训练结果。

 四、打印参数量和计算量

代码如下:

# =================================================================
# 计算参数量
from torchsummary import summary
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # PyTorch v0.4.0
Torch_model = PyTorch_LeNet(in_channels=1, num_classes=10).to(device)
summary(Torch_model, (1, 32, 32))# ==================================================================
# 计算计算量
model = PyTorch_LeNet(in_channels=1, num_classes=10)
dummy_input = torch.randn(1, 1, 32, 32)# 使用 torchprofile 来计算 FLOPs 和参数量
profile = torchprofile.profile_macs(model, dummy_input)
print(f"FLOPs: {profile}")

运行结果:

 

五、模型预测

使用保存好的模型,对测试集中的某一个数据进行模型预测,观察模型效果

代码如下:

# 模型预测
# 获取测试集中第一条数据
X, label = next(iter(test_loader))
logits = runner.predict(X)
# 多分类,使用softmax计算预测概率
pred = F.softmax(logits,dim=1)
# 获取概率最大的类别
pred_class = torch.argmax(pred[2]).numpy()
label = label[2].numpy()
# 输出真实类别与预测类别
print("The true category is {} and the predicted category is {}".format(label, pred_class))
# 可视化图片
plt.figure(figsize=(2, 2))
image, label = test_images[2], test_labels[2]
image = np.array(image).astype('float32')
image = np.reshape(image, [28,28])
image = Image.fromarray(image.astype('uint8'), mode='L')
plt.imshow(image)
plt.show()

运行结果:

预测正确!

附:完整可运行代码

主程序:

import gzip
import timeimport numpy as np
import torchprofile
from matplotlib import pyplot as plt
import PIL.Image as Image
import torchvision.transforms as transforms
from torch.profiler import profile
from torch.utils.data import Dataset, DataLoader# 读取 MNIST 图像数据
def load_images(file_path):with gzip.open(file_path, 'rb') as f:magic_number = int.from_bytes(f.read(4), 'big')  # 读取 magic numbernum_images = int.from_bytes(f.read(4), 'big')  # 图片数量num_rows = int.from_bytes(f.read(4), 'big')  # 图片行数num_cols = int.from_bytes(f.read(4), 'big')  # 图片列数images = np.frombuffer(f.read(), dtype=np.uint8)  # 读取图片数据images = images.reshape(num_images, num_rows, num_cols)  # 重塑为 (num_images, 28, 28)return images# 读取 MNIST 标签数据
def load_labels(file_path):with gzip.open(file_path, 'rb') as f:magic_number = int.from_bytes(f.read(4), 'big')  # 读取 magic numbernum_labels = int.from_bytes(f.read(4), 'big')  # 标签数量labels = np.frombuffer(f.read(), dtype=np.uint8)  # 读取标签数据return labels# 加载数据
images = load_images('./MNIST/raw/train-images-idx3-ubyte.gz')
labels = load_labels('./MNIST/raw/train-labels-idx1-ubyte.gz')
train_images = images[:1000]
train_labels = labels[:1000]
dev_images = images[1000:1200]
dev_labels = labels[1000:1200]
test_images = images[1200:1400]
test_labels = labels[1200:1400]# 打印数据集分布信息
print(f'Length of train/dev/test set: {len(train_images)}/{len(dev_images)}/{len(test_images)}')image, label = train_images[2], train_labels[2]
image, label = np.array(image).astype('float32'), int(label)
# 原始图像数据为长度784的行向量,需要调整为[28,28]大小的图像
image = np.reshape(image, [28, 28])
image = Image.fromarray(image.astype('uint8'), mode='L')
print("The number in the picture is {}".format(label))
plt.figure(figsize=(5, 5))
plt.imshow(image)
plt.show()# 定义训练集、验证集和测试集
train_set = {"images": train_images, "labels": train_labels}
dev_set = {"images": dev_images, "labels": dev_labels}
test_set = {"images": test_images, "labels": test_labels}# 数据预处理
transforms = transforms.Compose([transforms.Resize(32), transforms.ToTensor(), transforms.Normalize(mean=[0.5], std=[0.5])])class MNIST_dataset(Dataset):def __init__(self, dataset, transforms, mode='train'):self.mode = modeself.transforms = transformsself.dataset = datasetdef __getitem__(self, idx):# 从字典中获取图像和标签image, label = self.dataset["images"][idx], self.dataset["labels"][idx]image, label = np.array(image).astype('float32'), int(label)image = np.reshape(image, [28, 28])image = Image.fromarray(image.astype('uint8'), mode='L')image = self.transforms(image)return image, labeldef __len__(self):# 返回图像数量return len(self.dataset["images"])# 加载 mnist 数据集
train_dataset = MNIST_dataset(dataset=train_set, transforms=transforms, mode='train')
test_dataset = MNIST_dataset(dataset=test_set, transforms=transforms, mode='test')
dev_dataset = MNIST_dataset(dataset=dev_set, transforms=transforms, mode='dev')import torch
import torch.nn.functional as F
import torch.nn as nn
from torch.nn.init import constant_, normal_, uniform_
from CNN_op import Conv2D, Pool2Dclass Model_LeNet(nn.Module):def __init__(self, in_channels, num_classes=10):super(Model_LeNet, self).__init__()# 卷积层:输出通道数为6,卷积核大小为5×5self.conv1 = Conv2D(in_channels=in_channels, out_channels=6, kernel_size=5)# 汇聚层:汇聚窗口为2×2,步长为2self.pool2 = Pool2D(size=(2, 2), mode='max', stride=2)# 卷积层:输入通道数为6,输出通道数为16,卷积核大小为5×5,步长为1self.conv3 = Conv2D(in_channels=6, out_channels=16, kernel_size=5, stride=1)# 汇聚层:汇聚窗口为2×2,步长为2self.pool4 = Pool2D(size=(2, 2), mode='avg', stride=2)# 卷积层:输入通道数为16,输出通道数为120,卷积核大小为5×5self.conv5 = Conv2D(in_channels=16, out_channels=120, kernel_size=5, stride=1)# 全连接层:输入神经元为120,输出神经元为84self.linear6 = nn.Linear(120, 84)# 全连接层:输入神经元为84,输出神经元为类别数self.linear7 = nn.Linear(84, num_classes)def forward(self, x):# C1:卷积层+激活函数output = F.relu(self.conv1(x))# S2:汇聚层output = self.pool2(output)# C3:卷积层+激活函数output = F.relu(self.conv3(output))# S4:汇聚层output = self.pool4(output)# C5:卷积层+激活函数output = F.relu(self.conv5(output))# 输入层将数据拉平[B,C,H,W] -> [B,CxHxW]output = torch.squeeze(output, dim=3)output = torch.squeeze(output, dim=2)# F6:全连接层output = F.relu(self.linear6(output))# F7:全连接层output = self.linear7(output)return outputclass PyTorch_LeNet(nn.Module):def __init__(self, in_channels, num_classes=10):super(PyTorch_LeNet, self).__init__()# 卷积层:输出通道数为6,卷积核大小为5*5self.conv1 = nn.Conv2d(in_channels=in_channels, out_channels=6, kernel_size=5)# 汇聚层:汇聚窗口为2*2,步长为2self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)# 卷积层:输入通道数为6,输出通道数为16,卷积核大小为5*5self.conv3 = nn.Conv2d(in_channels=6, out_channels=16, kernel_size=5)# 汇聚层:汇聚窗口为2*2,步长为2self.pool4 = nn.AvgPool2d(kernel_size=2, stride=2)# 卷积层:输入通道数为16,输出通道数为120,卷积核大小为5*5self.conv5 = nn.Conv2d(in_channels=16, out_channels=120, kernel_size=5)# 全连接层:输入神经元为120,输出神经元为84self.linear6 = nn.Linear(in_features=120, out_features=84)# 全连接层:输入神经元为84,输出神经元为类别数self.linear7 = nn.Linear(in_features=84, out_features=num_classes)def forward(self, x):# C1:卷积层+激活函数output = F.relu(self.conv1(x))# S2:汇聚层output = self.pool2(output)# C3:卷积层+激活函数output = F.relu(self.conv3(output))# S4:汇聚层output = self.pool4(output)# C5:卷积层+激活函数output = F.relu(self.conv5(output))# 输入层将数据拉平[B,C,H,W] -> [B,CxHxW]output = torch.squeeze(output, dim=3)output = torch.squeeze(output, dim=2)# F6:全连接层output = F.relu(self.linear6(output))# F7:全连接层output = self.linear7(output)return output# 打印每一层的参数
# 这里用np.random创建一个随机数组作为输入数据
inputs = np.random.randn(*[1, 1, 32, 32])
inputs = inputs.astype('float32')
model = PyTorch_LeNet(in_channels=1, num_classes=10)
c = []
for a, b in model.named_children():c.append(a)
print(c)
x = torch.tensor(inputs)
for a, item in model.named_children():try:x = item(x)except:x = torch.reshape(x, [x.shape[0], -1])x = item(x)d = []e = []for b, c in item.named_parameters():d.append(b)e.append(c)if len(e) == 2:print(a, x.shape, e[0].shape,e[1].shape)else:# 汇聚层没有参数print(a, x.shape)# ==========================================================================
# 模型训练
import torch.optim as opt
import torch.utils.data as data
from nndl_3 import RunnerV3, Accuracybatch_size = 16
train_loader = data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
dev_loader = data.DataLoader(dev_dataset, batch_size=batch_size)
test_loader = data.DataLoader(test_dataset, batch_size=batch_size)seed = 300
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)  # 如果使用 GPU,还可以设置 CUDA 的随机种子
torch.backends.cudnn.deterministic = True  # 使得 CUDA 确定性计算
torch.backends.cudnn.benchmark = False     # 防止优化导致不一致# 自己定义的卷积、池化算子进行训练==================================
# print("使用自己定义的算子进行训练:================================")
# time1 = time.time()
# model = Model_LeNet(in_channels=1, num_classes=10)
# # 定义优化器
# lr = 0.1
# optimizer = opt.SGD(lr=lr, params=model.parameters())
# # 定义损失函数
# loss_fn = F.cross_entropy
# # 定义评价指标
# metric = Accuracy(is_logist=True)
# # 实例化 RunnerV3 类,并传入训练配置。
# runner = RunnerV3(model, optimizer, loss_fn, metric)
# # 启动训练
# log_steps = 15
# eval_steps = 15
# runner.train(train_loader, dev_loader, num_epochs=10, log_steps=log_steps,
#              eval_steps=eval_steps, save_path="best_model.pdparams")
# time2 = time.time()
# print("使用自定义算子运行时间:", time2 - time1)
# # 损失可视化
# from nndl_3 import plot
# plot(runner, 'cnn-loss1.pdf')
#
# # 加载最优模型
# runner.load_model('best_model.pdparams')
# # 模型评价
# score, loss = runner.evaluate(test_loader)
# print("[Test] accuracy/loss: {:.4f}/{:.4f}".format(score, loss))
# print("=================================================================")# 使用torch框架算子进行训练======================================
print("使用框架算子进行训练:=======================================")
time3 = time.time()
model = PyTorch_LeNet(in_channels=1, num_classes=10)
# 定义优化器
lr = 0.1
optimizer = opt.SGD(lr=lr, params=model.parameters())
# 定义损失函数
loss_fn = F.cross_entropy
# 定义评价指标
metric = Accuracy(is_logist=True)
# 实例化 RunnerV3 类,并传入训练配置。
runner = RunnerV3(model, optimizer, loss_fn, metric)
# 启动训练
log_steps = 15
eval_steps = 15
runner.train(train_loader, dev_loader, num_epochs=10, log_steps=log_steps,eval_steps=eval_steps, save_path="best_model.pdparams")
time4 = time.time()
print("使用框架算子的运行时间", time4 - time3)
# 损失可视化
from nndl_3 import plot
plot(runner, 'cnn-loss1.pdf')
# 加载最优模型
runner.load_model('best_model.pdparams')
# 模型评价
score, loss = runner.evaluate(test_loader)
print("[Test] accuracy/loss: {:.4f}/{:.4f}".format(score, loss))# =================================================================
# 计算参数量
from torchsummary import summary
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # PyTorch v0.4.0
Torch_model = PyTorch_LeNet(in_channels=1, num_classes=10).to(device)
summary(Torch_model, (1, 32, 32))# ==================================================================
# 计算计算量
model = PyTorch_LeNet(in_channels=1, num_classes=10)
dummy_input = torch.randn(1, 1, 32, 32)# 使用 torchprofile 来计算 FLOPs 和参数量
profile = torchprofile.profile_macs(model, dummy_input)
print(f"FLOPs: {profile}")# =======================================================
# 模型预测
# 获取测试集中第一条数据
X, label = next(iter(test_loader))
logits = runner.predict(X)
# 多分类,使用softmax计算预测概率
pred = F.softmax(logits,dim=1)
# 获取概率最大的类别
pred_class = torch.argmax(pred[2]).numpy()
label = label[2].numpy()
# 输出真实类别与预测类别
print("The true category is {} and the predicted category is {}".format(label, pred_class))
# 可视化图片
plt.figure(figsize=(2, 2))
image, label = test_images[2], test_labels[2]
image = np.array(image).astype('float32')
image = np.reshape(image, [28,28])
image = Image.fromarray(image.astype('uint8'), mode='L')
plt.imshow(image)
plt.show()

nndl_3.py代码:

import torch
from matplotlib import pyplot as plt
from torch import nnclass Op(object):def __init__(self):passdef __call__(self, inputs):return self.forward(inputs)def forward(self, inputs):raise NotImplementedErrordef backward(self, inputs):raise NotImplementedError# 实现一个两层前馈神经网络
class Model_MLP_L2_V3(torch.nn.Module):def __init__(self, input_size, hidden_size, output_size):super(Model_MLP_L2_V3, self).__init__()self.fc1 = torch.nn.Linear(input_size, hidden_size)w_ = torch.normal(0, 0.01, size=(hidden_size, input_size), requires_grad=True)self.fc1.weight = torch.nn.Parameter(w_)self.fc1.bias = torch.nn.init.constant_(self.fc1.bias, val=1.0)self.fc2 = torch.nn.Linear(hidden_size, output_size)w2 = torch.normal(0, 0.01, size=(output_size, hidden_size), requires_grad=True)self.fc2.weight = nn.Parameter(w2)self.fc2.bias = torch.nn.init.constant_(self.fc2.bias, val=1.0)self.act = torch.sigmoiddef forward(self, inputs):outputs = self.fc1(inputs)outputs = self.act(outputs)outputs = self.fc2(outputs)return outputsclass RunnerV3(object):def __init__(self, model, optimizer, loss_fn, metric, **kwargs):self.model = modelself.optimizer = optimizerself.loss_fn = loss_fnself.metric = metric  # 只用于计算评价指标# 记录训练过程中的评价指标变化情况self.dev_scores = []# 记录训练过程中的损失函数变化情况self.train_epoch_losses = []  # 一个epoch记录一次lossself.train_step_losses = []  # 一个step记录一次lossself.dev_losses = []# 记录全局最优指标self.best_score = 0def train(self, train_loader, dev_loader=None, **kwargs):# 将模型切换为训练模式self.model.train()# 传入训练轮数,如果没有传入值则默认为0num_epochs = kwargs.get("num_epochs", 0)# 传入log打印频率,如果没有传入值则默认为100log_steps = kwargs.get("log_steps", 100)# 评价频率eval_steps = kwargs.get("eval_steps", 0)# 传入模型保存路径,如果没有传入值则默认为"best_model.pdparams"save_path = kwargs.get("save_path", "best_model.pdparams")custom_print_log = kwargs.get("custom_print_log", None)# 训练总的步数num_training_steps = num_epochs * len(train_loader)if eval_steps:if self.metric is None:raise RuntimeError('Error: Metric can not be None!')if dev_loader is None:raise RuntimeError('Error: dev_loader can not be None!')# 运行的step数目global_step = 0# 进行num_epochs轮训练for epoch in range(num_epochs):# 用于统计训练集的损失total_loss = 0for step, data in enumerate(train_loader):X, y = data# 获取模型预测logits = self.model(X)loss = self.loss_fn(logits, y)  # 默认求meantotal_loss += loss# 训练过程中,每个step的loss进行保存self.train_step_losses.append((global_step, loss.item()))if log_steps and global_step % log_steps == 0:print(f"[Train] epoch: {epoch}/{num_epochs}, step: {global_step}/{num_training_steps}, loss: {loss.item():.5f}")# 梯度反向传播,计算每个参数的梯度值loss.backward()if custom_print_log:custom_print_log(self)# 小批量梯度下降进行参数更新self.optimizer.step()# 梯度归零self.optimizer.zero_grad()# 判断是否需要评价if eval_steps > 0 and global_step > 0 and \(global_step % eval_steps == 0 or global_step == (num_training_steps - 1)):dev_score, dev_loss = self.evaluate(dev_loader, global_step=global_step)print(f"[Evaluate]  dev score: {dev_score:.5f}, dev loss: {dev_loss:.5f}")# 将模型切换为训练模式self.model.train()# 如果当前指标为最优指标,保存该模型if dev_score > self.best_score:self.save_model(save_path)print(f"[Evaluate] best accuracy performence has been updated: {self.best_score:.5f} --> {dev_score:.5f}")self.best_score = dev_scoreglobal_step += 1# 当前epoch 训练loss累计值trn_loss = (total_loss / len(train_loader)).item()# epoch粒度的训练loss保存self.train_epoch_losses.append(trn_loss)print("[Train] Training done!")# 模型评估阶段,使用'torch.no_grad()'控制不计算和存储梯度@torch.no_grad()def evaluate(self, dev_loader, **kwargs):assert self.metric is not None# 将模型设置为评估模式self.model.eval()global_step = kwargs.get("global_step", -1)# 用于统计训练集的损失total_loss = 0# 重置评价self.metric.reset()# 遍历验证集每个批次for batch_id, data in enumerate(dev_loader):X, y = data# 计算模型输出logits = self.model(X)# 计算损失函数loss = self.loss_fn(logits, y).item()# 累积损失total_loss += loss# 累积评价self.metric.update(logits, y)dev_loss = (total_loss / len(dev_loader))dev_score = self.metric.accumulate()# 记录验证集lossif global_step != -1:self.dev_losses.append((global_step, dev_loss))self.dev_scores.append(dev_score)return dev_score, dev_loss# 模型评估阶段,使用'torch.no_grad()'控制不计算和存储梯度@torch.no_grad()def predict(self, x, **kwargs):# 将模型设置为评估模式self.model.eval()# 运行模型前向计算,得到预测值logits = self.model(x)return logitsdef save_model(self, save_path):torch.save(self.model.state_dict(), save_path)def load_model(self, model_path):model_state_dict = torch.load(model_path)self.model.load_state_dict(model_state_dict)class Accuracy():def __init__(self, is_logist=True):# 用于统计正确的样本个数self.num_correct = 0# 用于统计样本的总数self.num_count = 0self.is_logist = is_logistdef update(self, outputs, labels):if outputs.shape[1] == 1:  # 二分类outputs = torch.squeeze(outputs, dim=-1)if self.is_logist:# logist判断是否大于0preds = torch.tensor((outputs >= 0), dtype=torch.float32)else:# 如果不是logist,判断每个概率值是否大于0.5,当大于0.5时,类别为1,否则类别为0preds = torch.tensor((outputs >= 0.5), dtype=torch.float32)else:# 多分类时,使用'torch.argmax'计算最大元素索引作为类别preds = torch.argmax(outputs, dim=1)# 获取本批数据中预测正确的样本个数labels = torch.squeeze(labels, dim=-1)batch_correct = torch.sum(torch.tensor(preds == labels, dtype=torch.float32)).numpy()batch_count = len(labels)# 更新num_correct 和 num_countself.num_correct += batch_correctself.num_count += batch_countdef accumulate(self):# 使用累计的数据,计算总的指标if self.num_count == 0:return 0return self.num_correct / self.num_countdef reset(self):# 重置正确的数目和总数self.num_correct = 0self.num_count = 0def name(self):return "Accuracy"# 可视化
def plot(runner, fig_name):plt.figure(figsize=(10, 5))plt.subplot(1, 2, 1)train_items = runner.train_step_losses[::30]train_steps = [x[0] for x in train_items]train_losses = [x[1] for x in train_items]plt.plot(train_steps, train_losses, color='#8E004D', label="Train loss")if runner.dev_losses[0][0] != -1:dev_steps = [x[0] for x in runner.dev_losses]dev_losses = [x[1] for x in runner.dev_losses]plt.plot(dev_steps, dev_losses, color='#E20079', linestyle='--', label="Dev loss")# 绘制坐标轴和图例plt.ylabel("loss", fontsize='x-large')plt.xlabel("step", fontsize='x-large')plt.legend(loc='upper right', fontsize='x-large')plt.subplot(1, 2, 2)# 绘制评价准确率变化曲线if runner.dev_losses[0][0] != -1:plt.plot(dev_steps, runner.dev_scores,color='#E20079', linestyle="--", label="Dev accuracy")else:plt.plot(list(range(len(runner.dev_scores))), runner.dev_scores,color='#E20079', linestyle="--", label="Dev accuracy")# 绘制坐标轴和图例plt.ylabel("score", fontsize='x-large')plt.xlabel("step", fontsize='x-large')plt.legend(loc='lower right', fontsize='x-large')plt.savefig(fig_name)plt.show()

本次的分享就到这里啦,我们下次再见~


http://www.ppmy.cn/embedded/139483.html

相关文章

容器运行时 AND Docker

容器运行时 and Docker 什么是Docker Docker 使用 Google 公司推出的 Go 语言 进行开发实现,基于 Linux 内核的 cgroup,namespace,以及 AUFS 类的 Union FS 等技术,对进程进行封装隔离,属于 操作系统层面的虚拟化技术…

基于web的教务系统的实现(springboot框架 mysql jpa freemarker)

💗博主介绍💗:✌在职Java研发工程师、专注于程序设计、源码分享、技术交流、专注于Java技术领域和毕业设计✌ 温馨提示:文末有 CSDN 平台官方提供的老师 Wechat / QQ 名片 :) Java精品实战案例《700套》 2025最新毕业设计选题推荐…

CSS(7):定位position:相对定位(relative)、绝对定位(absolute)、固定定位(fixed)和静态定位(static)

一.定位:将盒子定在某一个位置,其规则为:定位 定位模式 边偏移 。 二:定位模式 1.static静态定位: 元素无设置的时候就是static “position:static;” 2.relative相对定位:相对于当前位置进行移动,通过…

【Three.js基础学习】27.Modified materials

前言 补充:\node_modules\three\src\renderers\shaders 自Three.js第132版以来,位于ShaderLib/文件夹中的着色器现在按材质分组。 顶点代码和片段代码都在同一个文件中。 课程 学习如何改进3DS内置材质 改进网格标准材质 两种方法 1.使用Three.js钩子&am…

用匠心精神解决LeetCode第726题原子的数量

726.原子的数量 难度:困难 问题描述: 给你一个字符串化学式formula,返回每种原子的数量。 原子总是以一个大写字母开始,接着跟随0个或任意个小写字母,表示原子的名字。 如果数量大于1,原子后会跟着数字…

云计算虚拟化-kvm创建虚拟机

作者介绍:简历上没有一个精通的运维工程师。希望大家多多关注作者,下面的思维导图也是预计更新的内容和当前进度(不定时更新)。 虚拟化,简单来说就是把一台服务器/PC电脑,虚拟成多台独立的虚拟机,每台虚拟机之间相互隔…

【大语言模型】ACL2024论文-12 大型语言模型的能力如何受到监督式微调数据组成影响

【大语言模型】ACL2024论文-12 大型语言模型的能力如何受到监督式微调数据组成影响 论文:https://arxiv.org/pdf/2310.05492 目录 文章目录 【大语言模型】ACL2024论文-12 大型语言模型的能力如何受到监督式微调数据组成影响论文:https://arxiv.org/p…

sql中的聚合函数

SQL中的聚合函数用于对表中的数据进行汇总计算,常用来生成统计信息,例如总和、平均值、最大值、最小值等。它们通常与GROUP BY子句一起使用,以对数据分组后再计算聚合结果。 以下是SQL中常用的聚合函数及其详细讲解: 1. COUNT( )…