教材链接-3.2. 线性回归的从零开始实现
c++实现
该博客仅用于记录一下自己的代码,可与c++实现作为对照
python">from d2l import torch as d2l
import torch
import random
# nn是神经网络的缩写
from torch import nn
from torch.utils import data# 加载训练数据
# 加载训练数据集
simples = torch.load('datas.pt')
# 这里是加载了训练和测试数据集的真实权重和偏差,仅作为最后训练结果的验证使用
tw, tb = torch.load('wb.pt')
# 加载测试数据集
tests = torch.load('test.pt')
# 获取训练数据集的样本数量
simple_num = simples.shape[0]# 获取数据读取迭代器
def data_iter(batch_size, features, labels):# 计算数据的总数量num_examples = len(features)# 创建一个包含数据索引的列表 indices = list(range(num_examples))# 随机打乱索引列表,以实现随机读取样本,对训练结果意义不明# random.shuffle(indices)# 遍历打乱后的indices,每次取出batch_size个索引,用于构建一个小批量数据 for i in range(0, num_examples, batch_size):# 获取当前批次的索引号并以张量形式存储batch_indices = torch.tensor(indices[i: min(i + batch_size, num_examples)])# 根据索引从特征和标签中提取数据 yield features[batch_indices], labels[batch_indices]
# 在Python中,yield 是一个关键字,用于定义一个生成器(generator)。生成器是一种特殊的迭代器,它允许你定义一个可以记住上一次返回时在函数体中的位置的函数。对生成器函数的第二次(或第n次)调用将恢复函数的执行,并继续从上次挂起的位置开始。# 定义一个函数来加载并批量处理数据,返回数据获取迭代器
def load_array(data_arrays, batch_size, is_train=True): #@save"""构造一个PyTorch数据迭代器"""# 使用TensorDataset将多个tensor组合成一个数据集 dataset = data.TensorDataset(*data_arrays)# 使用DataLoader加载数据集,并指定批量大小和是否打乱数据return data.DataLoader(dataset, batch_size, shuffle=is_train)# 定义线性回归模型
def linreg(X, w, b): #@save"""线性回归模型"""# 使用矩阵乘法计算预测值,并加上偏差 return torch.matmul(X, w) + b# 定义平方损失函数
def squared_loss(y_hat, y): #@save"""均方损失"""# 计算预测值与实际值之间的平方差,并除以2(方便梯度计算)return (y_hat - y.reshape(y_hat.shape)) ** 2 / 2# 定义交叉熵损失函数,线性回归模型用不到
def cross_entropy(y_hat, y):return - torch.log(y_hat[range(len(y_hat)), y])# 定义一个鲁棒的损失函数,结合了平方损失和绝对值损失
def robust_loss(y_hat, y, delta=1.0):residual = torch.abs(y_hat - y)return torch.where(residual<delta, 0.5* residual **2, delta*(residual-0.5*delta))# 绝对值损失函数
def abs_loss(y_hat, y):return torch.abs(y_hat - y.reshape(y_hat.shape))# 定义随机梯度下降函数
def sgd(params, lr, batch_size): #@save"""小批量随机梯度下降"""with torch.no_grad():# 遍历模型参数 for param in params:# 更新参数值,使用学习率lr乘以参数的梯度,并除以批量大小 param -= lr * param.grad / batch_size# 清除参数的梯度,为下一轮迭代做准备 param.grad.zero_()# 数据标准化处理
def standard(X):X_mean = torch.mean(X, dim=0)X_std = torch.std(X, dim=0)return (X-X_mean)/X_std# 数据最小最大归一化处理
def min_max(X):X_min = torch.min(X, dim=0)[0]X_max = torch.max(X, dim=0)[0]return (X-X_min)/(X_max-X_min)# 不进行任何处理,直接返回输入
def noProcess(X):return X
#Linear Regression Implementation from Scratch
if __name__ == '__main__':# 设置学习率和训练轮数 lr = 0.03num_epochs = 20# 这里其实net变量并没有定义为一个神经网络模型,而是一个函数 # 但为了与后续代码保持一致,我们仍然使用net来表示这个线性回归函数# loss同理net = linregloss = squared_loss# 使用不进行任何处理的数据处理方式 data_process = noProcess# 将数据分成50个批次,计算每批数据的数量 batch_size = simple_num // 50# 提取特征和标签 # 提取最后一列作为标签 label = simples[:,-1]# 提取除最后一列外的所有列作为特征,并使用data_process进行处理feature=data_process(simples[:, :-1])# 初始化权重和偏差,权重使用正态分布初始化,偏差初始化为0 w = torch.normal(0, 1, size=(feature.shape[1], 1), requires_grad=True)# w = torch.tensor([0.3], requires_grad=True)b = torch.tensor([0.0], requires_grad=True)timer = d2l.Timer()# 开始训练 for epoch in range(num_epochs):# 通过data_iter遍历数据进行一轮训练for X,y in data_iter(batch_size, feature, label):# 计算预测值y_hat = net(X, w, b)# 计算损失l = loss(y_hat, y)# 反向传播计算梯度 l.sum().backward()# 使用随机梯度下降更新参数sgd([w,b], lr, batch_size)# 一轮训练结束后,计算整个训练集上的损失,用以监控训练效果# with torch.no_grad(): 告诉 PyTorch 在这个上下文内不要计算梯度,从而节省内存并加速计算。with torch.no_grad():label_hat = net(feature, w, b)epoch_loss = loss(label_hat, label)if epoch%5 == 0:print(f'in epoch{epoch+1}, loss is {epoch_loss.sum()}')# 在训练完成后,计算测试集上的预测值和损失 # 提取测试集的特征和标签 test_feature = data_process(tests[:, :-1])test_label = tests[:, -1]# 计算测试集上的预测值和损失 test_label_hat = net(test_feature, w, b)label_loss = loss(test_label_hat, test_label)print(f'in test epoch, loss is {label_loss.mean()}')print(f'true_w={tw}, true_b={tb}, w={w}, b={b}')print(f' {num_epochs} epoch, time {timer.stop():.2f} sec')
#Concise Implementation of Linear Regression
#the concise implementation have lower accuracy than from scratch
if __name__ == '__main2__':# 设置学习率、训练轮数、数据处理方式和批量大小 lr = 0.03num_epochs = 15# 使用不进行任何处理的数据处理方式 data_process = noProcess# 将数据分成50个批次,计算每批数据的数量 batch_size = simple_num // 50# 提取特征和标签 label = simples[:,-1]feature=data_process(simples[:, :-1])# 加载数据并创建数据迭代器 data_iter = load_array((feature, label), batch_size)# 构建神经网络模型,这里是一个简单的线性回归模型 net = nn.Sequential(nn.Linear(feature.shape[1], 1))# 我们的模型只包含一个层,因此实际上不需要Sequential# 不使用Sequential时,后面的net[0]需要改为net# net = nn.Linear(feature.shape[1], 1)# 初始化网络权重和偏置 net[0].weight.data.normal_(0, 0.01)net[0].bias.data.fill_(0)# 使用均方误差损失函数loss = nn.MSELoss()# 使用随机梯度下降优化器 trainer = torch.optim.SGD(net.parameters(), lr=lr)# 开始训练 for epoch in range(num_epochs):# 通过data_iter遍历数据进行一轮训练for X, y in data_iter:# 前向传播计算预测值y_hat = net(X)# 计算损失 l = loss(y_hat, y.reshape(y_hat.shape))# 梯度清零,为下一轮迭代计算做准备trainer.zero_grad()# 反向传播计算梯度 l.backward()# 使用随机梯度下降更新参数trainer.step()# 在每个epoch结束后,对整个数据集进行前向传播并计算损失,用于监控训练过程 label_hat = net(feature)epoch_loss = loss(label_hat, label.reshape(label_hat.shape))if epoch%5 == 0:print(f'in epoch{epoch+1}, loss is {epoch_loss.mean()}')# 在训练完成后,计算测试集上的预测值和损失 # 提取测试集的特征和标签 test_feature = data_process(tests[:, :-1])test_label = tests[:, -1]# 计算测试集上的预测值和损失 test_label_hat = net(test_feature)label_loss = loss(test_label_hat, test_label.reshape(test_label_hat.shape))print(f'in test epoch, loss is {label_loss.mean():f}')print(f'tw={tw}, tb={tb}, w={net[0].weight.data}, b={net[0].bias.data}')