在移动智能终端品类越发多样的时代,为了让模型可以顺利部署在算力和存储空间都受限的移动终端,对模型进行压缩尤为重要。模型压缩(model compression)可以降低神经网络参数量,减少延迟时间,从而实现提高神经网络推理速度、节省存储空间等目的。
一.量化
量化是指将模型权重参数用更少的比特数存储,以此来减少模型的存储空间和算力消耗。
1.基本原理
(1) 量化感知训练
Quantization-aware Training,QAT在训练过程中模拟量化过程,数据虽然表示为float32,但实际的值的间隔却会受到量化参数的设置。
QAT的具体流程如下:
1)初始化:设置权重和激活值范的范围和的初始值;
2)构建模拟量化网络:在需要量化的权重和激活值后插入伪量化算子;
3)量化训练:重复执行以下步骤直至网络收敛(计算量化网络层的权重和激活值的范围和,并根据该范围将量化损失带入到前向推理和后向参数更新的过程中);
4)导出量化网络:获取和,并计算量化参数,将量化参数s和z代入到量化公式中,转换网络中的权重为量化整数值;删除伪量化算子,在量化网络层前后分别插入量化和反量化算子。
(2) 后训练动态量化
Post training dynamic quantization是在浮点模型训练收敛之后进行量化操作,weight被提前量化,activation在前向推理过程中被动态量化(即每次都要根据实际运算的浮点数据范围每一层计算1次scale和zero_point,然后进行量化)。
在量化激活值时会以校准数据集为输入,执行推理流程然后统计每层激活值的数据分布并得到相应的量化参数,具体操作流程如下:
1)使用直方图统计的方式得到原始float32数据的统计分布;
2)在给定的搜索空间中选取若干个和分别对激活值进行量化,得到量化后的数据;
3)使用直方图统计得到的统计分布;
4)计算每个与的统计分布差异,并找到差异性最低的1个对应的和来计算相应的量化参数;常用的用于度量分布差异的指标包括KL散度、对称KL散度和JS散度。
(3) 后训练静态量化
activation会基于之前校准过程中记录下的固定的scale和zero_point进行量化,整个过程不存在量化参数(scale,zero_point)的再计算。
2.代码实例
(1) 加载数据
python">import torch
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.nn.functional as F
from torch.quantization import QuantStub, DeQuantStub
import torch.optim as optim
from torch.quantization import get_default_qconfig, prepare_qat, convert# 定义数据预处理
transform = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) # 标准化
])# 加载训练集和测试集
trainset = torchvision.datasets.CIFAR10(root='./data', train=True,download=True, transform=transform)
trainloader = DataLoader(trainset, batch_size=64, shuffle=True, num_workers=2)
testset = torchvision.datasets.CIFAR10(root='./data', train=False,download=True, transform=transform)
testloader = DataLoader(testset, batch_size=64, shuffle=False, num_workers=2)
classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
(2) 构建量化网络
python">class QuantizedCNN(nn.Module):def __init__(self):super(QuantizedCNN, self).__init__()self.quant = QuantStub()self.conv1 = nn.Conv2d(3, 16, 5)self.pool = nn.MaxPool2d(2, 2)self.conv2 = nn.Conv2d(16, 32, 5)self.fc1 = nn.Linear(32 * 5 * 5, 120)self.fc2 = nn.Linear(120, 84)self.fc3 = nn.Linear(84, 10)self.dequant = DeQuantStub()def forward(self, x):# x = self.quant(x)x = self.pool(F.relu(self.conv1(x)))x = self.pool(F.relu(self.conv2(x)))x = torch.flatten(x, 1)x = F.relu(self.fc1(x))x = F.relu(self.fc2(x))x = self.fc3(x)x = self.dequant(x)return xdevice = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = QuantizedCNN().to(device)
model.qconfig = get_default_qconfig('qnnpack')
(3) 量化训练并保存模型
python"># 训练循环
num_epochs = 10
for epoch in range(num_epochs):model.train()running_loss = 0.0for i, data in enumerate(trainloader, 0):inputs, labels = data[0].to(device), data[1].to(device)optimizer.zero_grad()outputs = model(inputs)loss = criterion(outputs, labels)loss.backward()optimizer.step()running_loss += loss.item()if i % 2000 == 1999:print(f'[{epoch + 1}, {i + 1}] loss: {running_loss / 2000:.3f}')running_loss = 0.0# 切换到评估模式进行测试model.eval()correct = 0total = 0with torch.no_grad():for data in testloader:images, labels = data[0].to(device), data[1].to(device)outputs = model(images)_, predicted = torch.max(outputs.data, 1)total += labels.size(0)correct += (predicted == labels).sum().item()print('Accuracy of the network on the 10000 test images: %d %%' % (100 * correct / total))# 在最后1个epoch后完成量化if epoch == num_epochs - 1:model_quantized = convert(model.eval(), inplace=True)print("Model quantization completed.")# 保存量化模型torch.save(model_quantized.state_dict(), 'quantized_model.pth')
(4) 模型测试
python">def test_quantized_model(model, dataloader, device='cpu'):model = convert(model.eval(), inplace=True)model.to(device) correct = 0total = 0with torch.no_grad(): for data, targets in dataloader:data, targets = data.to(device), targets.to(device) outputs = model(data) _, predicted = torch.max(outputs.data, 1) total += targets.size(0)correct += (predicted == targets).sum().item()accuracy = 100 * correct / totalprint(f'Accuracy of the quantized model on the test data: {accuracy:.2f}%')# 测试模型
quantized_model=QuantizedCNN()
quantized_model.load_state_dict(torch.load('quantized_model.pth'))
test_quantized_model(quantized_model, test_loader, device='cuda' if torch.cuda.is_available() else 'cpu'
二.剪枝
剪枝是指去除模型参数中冗余或不重要的部分,可以高效地生成规模更小、内存利用率更高、能耗更低、推断速度更快的模型。
1.基本原理
根据剪枝流程的位置,可以将剪枝操作分为2种:训练时剪枝和后剪枝。
(1) 训练时剪枝
和训练时使用dropout操作较为类似,训练时剪枝会根据当前模型的结果,删除不重要的结构,固化模型再进行训练,以后续的训练来弥补部分结构剪枝带来的不利影响。
(2) 后剪枝
在模型训练完成后,根据模型权重参数和剪枝测试选取需要剪枝的部分。
2.代码实例
(1) 加载预训练模型
python">import torch
import torchvision.models as models# 加载预训练的ResNet18模型
model = models.resnet18(pretrained=True)
(2) 定义剪枝算法
python">from torch.nn.utils.prune import global_unstructured# 定义剪枝比例
pruning_rate = 0.5# 对全连接层进行剪枝
def prune_model(model, pruning_rate):for name, module in model.named_modules():if isinstance(module, torch.nn.Linear):global_unstructured(module, pruning_dim=0, amount=pruning_rate)
(3)执行剪枝操作
python">prune_model(model, pruning_rate)# 查看剪枝后的模型结构
print(model)
(4) 重新训练和微调
剪枝后的模型需要重新进行训练和微调,以保证模型的准确性和性能。
python"># 定义损失函数和优化器
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
(5) 性能测试
三.蒸馏
蒸馏是指将知识从大模型(教师模型)向小模型(学生模型)传输的过程,可以用于模型压缩和训练加速。核心组件包括:知识(knowledge)、蒸馏算法(distillation algorithm)、教师学生架构(teacher-student architecture)。
1.基本原理
蒸馏的知识的形式可以是:激活、神经元、中间层特征、教师网络参数等。可将其归类为以下3种类型。
(1) Feature-Based Knowledge
基于特征的知识蒸馏引入中间层表征,教师网络的中间层作为学生网络对应层的提示(Hints层),从而提升学生网络模型的性能。核心是期望学生能够直接模仿教师网络的特征激活值。
(2) Relation-Based Knowledege
基于关系的知识蒸馏可以分为不同层之间的关系建模和不同样本之间的关系建模2种。
•不同层之间的关系建模
通常可以建模为:
其中,,表示学生网络内成对的特征图,,是相似度函数,代表教师网络与学生网络的关联函数。
•不同样本之间的关系建模
建模如下:
其中,,分别是teacher和student模型的特征表示;,。
基于关系的知识蒸馏的具体算法如下表所示。
(3) Response-Based Knowleddge
基于响应的知识蒸馏里响应一般指的是神经元的响应,即教师模型的最后1层逻辑输出。核心想法是让学生模型模仿教师网络的输出。
响应知识的loss:
Hinton提出的KD是将teacher的logits层作为soft label:
T是用于控制soft target重要程度的超参数。
整体蒸馏loss可以写作:
2.代码实例
(1) 加载数据
python">import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()# Normalize data
x_train = x_train.astype("float32") / 255.0
x_train = np.reshape(x_train, (-1, 28, 28, 1))x_test = x_test.astype("float32") / 255.0
x_test = np.reshape(x_test, (-1, 28, 28, 1))
(2) 构建teacher 、student模型结构
python"># Create the teacher
teacher = keras.Sequential([keras.Input(shape=(28, 28, 1)),layers.Conv2D(256, (3, 3), strides=(2, 2), padding="same"),layers.LeakyReLU(alpha=0.2),layers.MaxPooling2D(pool_size=(2, 2), strides=(1, 1), padding="same"),layers.Conv2D(512, (3, 3), strides=(2, 2), padding="same"),layers.Flatten(),layers.Dense(10),],name="teacher",
)# Create the student
student = keras.Sequential([keras.Input(shape=(28, 28, 1)),layers.Conv2D(16, (3, 3), strides=(2, 2), padding="same"),layers.LeakyReLU(alpha=0.2),layers.MaxPooling2D(pool_size=(2, 2), strides=(1, 1), padding="same"),layers.Conv2D(32, (3, 3), strides=(2, 2), padding="same"),layers.Flatten(),layers.Dense(10),],name="student",
)# Clone student for later comparison
student_scratch = keras.models.clone_model(student)
(3) 训练模型
python"># 1.Train teacher as usual
teacher.compile(optimizer=keras.optimizers.Adam(),loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),metrics=[keras.metrics.SparseCategoricalAccuracy()],
)
# Train and evaluate teacher on data.
teacher.fit(x_train, y_train, epochs=3)
teacher.evaluate(x_test, y_test)# 2.Train student as usual
student_scratch.compile(optimizer=keras.optimizers.Adam(),loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),metrics=[keras.metrics.SparseCategoricalAccuracy()],
)
# Train and evaluate student on data
student_scratch.fit(x_train, y_train, epochs=3)
student_scratch.evaluate(x_test, y_test)
(4) 构建蒸馏模型
python">class Distiller(keras.Model):def __init__(self, student, teacher):super(Distiller, self).__init__()self.teacher = teacherself.student = studentdef compile(self,optimizer,metrics,student_loss_fn,distillation_loss_fn,alpha=0.1,temperature=3,):super(Distiller, self).compile(optimizer=optimizer, metrics=metrics)self.student_loss_fn = student_loss_fnself.distillation_loss_fn = distillation_loss_fnself.alpha = alphaself.temperature = temperaturedef train_step(self, data):# Unpack datax, y = data# Forward pass of teacherteacher_predictions = self.teacher(x, training=False)with tf.GradientTape() as tape:# Forward pass of studentstudent_predictions = self.student(x, training=True)# Compute lossesstudent_loss = self.student_loss_fn(y, student_predictions)# Compute scaled distillation lossdistillation_loss = (self.distillation_loss_fn(tf.nn.softmax(teacher_predictions / self.temperature, axis=1),tf.nn.softmax(student_predictions / self.temperature, axis=1),)* self.temperature**2)loss = self.alpha * student_loss + (1 - self.alpha) * distillation_loss# Compute gradientstrainable_vars = self.student.trainable_variablesgradients = tape.gradient(loss, trainable_vars)# Update weightsself.optimizer.apply_gradients(zip(gradients, trainable_vars))# Update the metrics configured in `compile()`.self.compiled_metrics.update_state(y, student_predictions)# Return a dict of performanceresults = {m.name: m.result() for m in self.metrics}results.update({"student_loss": student_loss, "distillation_loss": distillation_loss})return resultsdef test_step(self, data):# Unpack the datax, y = data# Compute predictionsy_prediction = self.student(x, training=False)# Calculate the lossstudent_loss = self.student_loss_fn(y, y_prediction)# Update the metrics.self.compiled_metrics.update_state(y, y_prediction)# Return a dict of performanceresults = {m.name: m.result() for m in self.metrics}results.update({"student_loss": student_loss})return results
(5)蒸馏
python"># Train student as doen usually
student_scratch.compile(optimizer=keras.optimizers.Adam(),loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),metrics=[keras.metrics.SparseCategoricalAccuracy()],
)# Train and evaluate student trained from scratch.
student_scratch.fit(x_train, y_train, epochs=1)
student_scratch.evaluate(x_test, y_test)
四.参考
(1) Knowledge Distillation: A Survey