11.2 线性神经网络
11.2.1 线性回归
nn.Linear
是线性层,对输入数据进行仿射变换\(y=XW^T+b\),其中\(W\)是权重矩阵,\(b\)是偏置。
对于简单的线性回归模型,故只需一层线性层即可。
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
torch.manual_seed(123) # 设置全局随机数种子
X = torch.randn(100,2) # 标准正态抽样
beta = torch.rand(2) # 均匀分布[0,1)抽样
intercept = torch.rand(1)
y = intercept + torch.matmul(X, beta) + torch.randn(100)*0.1
y = y.reshape(-1,1)
dataset = TensorDataset(X,y)
dataloader = DataLoader(dataset, batch_size=10, shuffle=True)
# 搭建网络结构
linear_model = nn.Sequential(
nn.Linear(2,1)
)
criterion = nn.MSELoss()
optimizer = optim.SGD(linear_model.parameters(), lr=0.05) # lr为学习率
for epoch in range(100): # 迭代50次
linear_model.train() # 训练模式
for batch_X, batch_y in dataloader:
y_pred = linear_model(batch_X) # 前向传播,计算预测值
loss = criterion(y_pred, batch_y) # 计算损失
optimizer.zero_grad() # 清零梯度
loss.backward() # 反向传播,计算梯度
optimizer.step() # 更新模型参数
linear_model.eval() # 评估模式
with torch.no_grad():
epoch_loss = criterion(linear_model(X),y)
if (epoch + 1) % 10 == 0:
print(f'Epoch_{epoch + 1}: {epoch_loss.item():.6f}')
print(linear_model) # 查看网络结构
print(linear_model[0].weight.data) # 查看第0层的权重
print(linear_model[0].bias.data) # 查看第0层的偏置
说明:
DataLoader()
将数据分批次,变为可迭代的对象,每次返回一个批次的数据。也可结合enumerate()
适用。nn.Sequential()
按顺序组织多个神经网络层,相较自定义类无需定义forward
方法,适用于简单场合的模型构建。
除了
nn.Sequential()
,还可以通过继承nn.Module
来自定义模型架构,从而创建更复杂的模型
nn.Linear()
是线性全连接层,用来实现仿射变换\(y=XW^T+b\)。nn.MSELoss()
定义了损失函数的类型,计算两个形状相同的张量之间的MSEoptim.SGD()
定义了优化算法为随机梯度下降法,linear_model.parameters()
用于传递需要优化的参数。.train()
、.eval()
分别代表模型的训练模式和评估模式,不同模式下会影响部分层(如Dropout层)的行为,一般在训练时开始.train()
,在计算相关指标时.eval()
。特别的,在评估时还可配合torch.no_grad()
来禁用梯度计算,节省内存和计算资源,从而提高计算速度。
11.2.2 二分类问题
对于二分类问题,输出层的维度应该为1,并且输出层的激活函数为Sigmoid函数,损失函数为nn.BCELoss()
等适用于二分类场合的函数。
nn.BCELoss()
即\(-y_i \log \hat y_i - (1-y_i)\log (1-\hat y_i)\)
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np
from torch.utils.data import TensorDataset, DataLoader
# 设置随机数种子
torch.manual_seed(123)
np.random.seed(321)
def gen_data(n_samples=2000, n_features=10, n_classes=2):
# 生成复杂的分类型数据(有重叠)
X, y = make_classification(
n_samples=n_samples, # 样本数
n_features=n_features, # 特征数
n_informative=8, # 有信息量的特征数量
n_redundant=2, # 冗余特征数量
n_repeated=0, # 重复特征数量
n_classes=n_classes, # 类别数
flip_y=0.15, # 15%的噪声
class_sep=0.8 # 类间分离程度
)
# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# 数据标准化
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
# 转换为PyTorch张量
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32).view(-1, 1) # 二分类需要二维标签
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32).view(-1, 1)
return X_train, X_test, y_train, y_test
# 生成数据
X_train, X_test, y_train, y_test = gen_data()
# 创建数据加载器
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
model = nn.Sequential(
nn.Linear(10,64),
nn.ReLU(),
nn.BatchNorm1d(64), # 对该批次数据进行标准化操作,并进行缩放和平移,可在一定程度上缓解“内部协变量偏移”情况
nn.Dropout(0.2), # 以一定概率丢弃某些神经元,从而缓解过拟合现象
nn.Linear(64,32),
nn.ReLU(),
nn.BatchNorm1d(32),
nn.Dropout(0.2),
nn.Linear(32,1),
nn.Sigmoid()
)
criterion = nn.BCELoss()
optimizer = optim.SGD(model.parameters(), lr=0.05)
# 将模型移到GPU上
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
# 存储训练指标
train_losses = []
train_accuracies = []
val_losses = []
val_accuracies = []
for epoch in range(100):
# 训练模式
model.train()
running_loss = 0.0
correct_train = 0
total_train = 0
for batch_x, batch_y in train_loader:
# 移动数据到设备
batch_x, batch_y = batch_x.to(device), batch_y.to(device)
# 前向传播
outputs = model(batch_x)
# 计算损失
loss = criterion(outputs, batch_y)
# 反向传播和优化
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 统计训练情况
running_loss += loss.item()
# 计算准确率
predictions = (outputs > 0.5).float()
correct_train += (predictions == batch_y).sum().item()
total_train += batch_y.size(0)
# 计算本轮训练的平均损失和准确率
epoch_loss = running_loss / len(train_loader)
epoch_acc = correct_train / total_train if total_train > 0 else 0
train_losses.append(epoch_loss)
train_accuracies.append(epoch_acc)
# 验证评估
model.eval()
with torch.no_grad():
# 移动测试数据到设备
test_x, test_y = X_test.to(device), y_test.to(device)
# 前向传播
outputs = model(test_x)
# 计算损失
val_loss = criterion(outputs, test_y).item()
# 计算预测结果
predictions = (outputs > 0.5).float()
# 计算准确率
correct_val = (predictions == test_y).sum().item()
total_val = test_y.size(0)
val_acc = correct_val / total_val
# 存储验证结果
val_losses.append(val_loss)
val_accuracies.append(val_acc)
# 每10个epoch打印一次进度
if (epoch + 1) % 10 == 0:
print("="*10,
f"Epoch_{epoch+1}",
"="*10,
f"\nTrain Loss: {epoch_loss:.4f}, Train Acc: {epoch_acc:.4f}\n",
f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
说明:
在将数据存储为张量时就要统一特征和标签的数据类型为浮点数,否则后续特征为浮点数,标签为整数会报错。同时,标签的形状也应变为二维的。
nn.BatchNorm1d()
对该批次数据进行标准化操作,并进行缩放和平移,可在一定程度上缓解“内部协变量偏移”情况nn.Dropout()
在训练model.train()
时会以一定概率丢弃某些神经元,从而缓解过拟合现象,是一种正则化技术。无论如何,模型和数据都要在同一设备上。张量数据必须重新赋值
batch_x = batch_x.to(device)
,而模型则可以直接model.to(device)
11.2.3 多分类问题
对于多分类问题,输出层维度为类别数,无需添加softmax函数,因为在交叉熵损失函数nn.CrossEntropyLoss()
中自带了softmax运算。
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np
from torch.utils.data import TensorDataset, DataLoader
# 设置随机数种子
torch.manual_seed(123)
np.random.seed(321)
def gen_data(n_samples=2000, n_features=10, n_classes=5):
# 生成复杂的分类型数据(有重叠)
X, y = make_classification(
n_samples=n_samples, # 样本数
n_features=n_features, # 特征数
n_informative=8, # 有信息量的特征数量
n_redundant=2, # 冗余特征数量
n_repeated=0, # 重复特征数量
n_classes=n_classes, # 类别数
flip_y=0.15, # 15%的噪声
class_sep=0.8 # 类间分离程度
)
# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# 数据标准化
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
# 转换为PyTorch张量
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long) # 交叉熵损失函数要求标签为整数型
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.long)
return X_train, X_test, y_train, y_test
# 生成数据,多分类任务
X_train, X_test, y_train, y_test = gen_data()
# 创建数据加载器
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
model = nn.Sequential(
nn.Linear(10,128),
nn.ReLU(),
nn.BatchNorm1d(128), # 对该批次数据进行标准化操作,并进行缩放和平移,可在一定程度上缓解“内部协变量偏移”情况
nn.Dropout(0.2), # 以一定概率丢弃某些神经元,从而缓解过拟合现象
nn.Linear(128,64),
nn.ReLU(),
nn.BatchNorm1d(64),
nn.Dropout(0.2),
nn.Linear(64,32),
nn.ReLU(),
nn.BatchNorm1d(32),
nn.Dropout(0.2),
nn.Linear(32,5) # 输出维度为类别数
)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.05)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
# 存储训练指标
train_losses = []
train_accuracies = []
val_losses = []
val_accuracies = []
for epoch in range(100):
# 训练模式
model.train()
running_loss = 0.0
correct_train = 0
total_train = 0
for batch_x, batch_y in train_loader:
# 移动数据到设备
batch_x, batch_y = batch_x.to(device), batch_y.to(device)
# 前向传播
outputs = model(batch_x)
# 计算损失
loss = criterion(outputs, batch_y)
# 反向传播和优化
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 统计训练情况
running_loss += loss.item()
# 计算准确率
predictions = torch.argmax(outputs, dim = 1) # logits值最大的为预测类别
correct_train += (predictions == batch_y).sum().item()
total_train += batch_y.size(0)
# 计算本轮训练的平均损失和准确率
epoch_loss = running_loss / len(train_loader)
epoch_acc = correct_train / total_train if total_train > 0 else 0
train_losses.append(epoch_loss)
train_accuracies.append(epoch_acc)
# 验证评估
model.eval()
with torch.no_grad():
# 移动测试数据到设备
test_x, test_y = X_test.to(device), y_test.to(device)
# 前向传播
outputs = model(test_x)
# 计算损失
val_loss = criterion(outputs, test_y).item()
# 计算预测结果
predictions = torch.argmax(outputs, dim = 1)
# 计算准确率
correct_val = (predictions == test_y).sum().item()
total_val = test_y.size(0)
val_acc = correct_val / total_val
# 存储验证结果
val_losses.append(val_loss)
val_accuracies.append(val_acc)
# 每10个epoch打印一次进度
if (epoch + 1) % 10 == 0:
print("="*10,
f"Epoch_{epoch+1}",
"="*10,
f"\nTrain Loss: {epoch_loss:.4f}, Train Acc: {epoch_acc:.4f}\n",
f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
说明:
nn.CrossEntropyLoss()
接收预测值logits(原值)与标签。其中logits为神经网络的原始输出,无需在输出时添加Softmax激活函数,nn.CrossEntropyLoss()
的内部会自动进行Softmax计算,避免重复计算。同时,标签要求为整数型且维度为1,不需要独热编码。如果需要输出概率,可以在输出logits后手动计算
torch.softmax(outputs, dim=1)
。如果要输出预测类别,可以
torch.argmax(outputs, dim=1)
。