联邦学习【01】杨强第三章横向联邦学习复现

news/2025/1/16 5:46:07/

环境:有无gpu均可
anaconda环境:conda install pytorch1.13.1 torchvision0.14.1 torchaudio==0.13.1 pytorch-cuda=11.6 -c pytorch -c nvidia
具体安装命令结合自己的gpu的cuda版本
在这里插入图片描述
项目代码放在
https://gitee.com/ihan1001gitee
https://github.com/ihan1001github
项目架构
在这里插入图片描述

conf.json文件,存放训练参数

{"model_name" : "resnet18","no_models" : 10,"type" : "cifar","global_epochs" :20,"local_epochs" : 3,"k" : 10,"batch_size" : 32,"lr" : 0.001,"momentum" : 0.0001,"lambda" : 0.1
}

在这里插入图片描述

model_name:模型名称
no_models:客户端总数量
type:数据集信息
global_epochs:全局迭代次数,即服务端与客户端的通信迭代次数
local_epochs:本地模型训练迭代次数
k:每一轮迭代时,服务端会从所有客户端中挑选k个客户端参与训练。
batch_size:本地训练每一轮的样本数
lr,momentum,lambda:本地训练的超参数设置

lr(learning rate)表示模型在每一次迭代时更新参数的步长大小。通常需要根据数据集和模型的具体情况来选择一个合适的学习率,如果学习率过高,则容易导致参数更新过快而无法收敛;如果学习率过低,则会使得模型收敛速度变慢甚至无法收敛。例如,可以使用optim.SGD类中的lr参数来设置学习率。momentum是一种加速梯度下降的方法,它可以在更新参数的过程中积累之前的梯度信息,并将其作为当前梯度的一部分。这样可以使得梯度下降更加平滑,避免震荡或者陷入局部极小值。通常可以将momentum设置为0.9左右,具体取值也需要根据数据集和模型来进行调整。例如,可以使用optim.SGD类中的momentum参数来设置动量。lambda是一种正则化方法,用于避免模型过拟合。正则化可以通过对模型的损失函数添加一个惩罚项来实现。通常有两种常见的正则化方法:L1正则化和L2正则化。L1正则化会使得模型的权重向量变得更加稀疏,可以通过torch.nn.L1Loss或者optim.L1Loss来实现;L2正则化则会使得模型的权重向量变得更加平滑,可以通过torch.nn.MSELoss或者optim.MSELoss来实现。参数lambda用于控制正则化项的大小,通常需要根据数据集和模型的具体情况来进行调整。

dataset.py

'''
-*- coding: utf-8 -*-
@Time    : 2023/11/23 17:35
@Author  : ihan
@File    : FederateAI
@Software: PyCharm
'''import torch
from torchvision import datasets, transforms#获取数据集
#调用语句 train_datasets, eval_datasets = datasets.get_dataset("./data/", conf["type"])
def get_dataset(dir, name):if name == 'mnist':train_dataset = datasets.MNIST(dir, train=True, download=True, transform=transforms.ToTensor())print(train_dataset)print("--------------------")'''dir 数据路径  train=True 代表训练数据集   Flase代表测试数据集download  是否从互联网下载数据集并存放在dir中transform=transforms.ToTensor() 表示将图像转换为张量形式'''eval_dataset = datasets.MNIST(dir, train=False, transform=transforms.ToTensor())print(eval_dataset)print("--------------------")elif name == 'cifar':'''transforms.Compose 是将多个transform组合起来使用(由transform构成的列表)'''transform_train = transforms.Compose([#transforms.RandomCrop(32, padding=4),#transforms.RandomCrop: 切割中心点的位置随机选取#随机裁剪原始图像,裁剪的大小为32x32像素,padding参数表示在图像的四周填充4个像素,以避免裁剪后图像边缘信息丢失。transforms.RandomHorizontalFlip(),#以0.5的概率对图像进行随机水平翻转,增加数据集的多样性。transforms.ToTensor(),#将图像转换为张量形式,便于后续处理。#transforms.Normalize: 给定均值:(R,G,B) 方差:(R,G,B),将会把Tensor正则化transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),#对图像进行标准化操作,减去均值并除以标准差。这里使用的均值和标准差是CIFAR-10数据集的全局均值和标准差,可以使模型更容易收敛。])print(transform_train)print("--------------------")transform_test = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),])print(transform_test)print("--------------------")train_dataset = datasets.CIFAR10(dir, train=True, download=True,transform=transform_train)eval_dataset = datasets.CIFAR10(dir, train=False, transform=transform_test)return train_dataset, eval_dataset
# if __name__ == '__main__':
#     train_datasets, eval_datasets = get_dataset("./data/", "cifar")
#     # print(train_datasets)
#     # print(eval_datasets)

在这里插入图片描述
server.py

'''
-*- coding: utf-8 -*-
@Time    : 2023/11/23 17:37
@Author  : ihan
@File    : FederateAI
@Software: PyCharm
'''import models, torchclass Server(object):'''定义构造函数。在构造函数中,服务端的工作包括:第一,将配置信息拷贝到服务端中;第二,按照配置中的模型信息获取模型,这里我们使用torchvision的models模块内置的ResNet - 18模型。'''def __init__(self, conf, eval_dataset):# 导入配置文件self.conf = conf# 根据配置获取模型文件self.global_model = models.get_model(self.conf["model_name"])# 生成一个测试集合加载器                                                           # 设置单个批次大小     # 打乱数据集self.eval_loader = torch.utils.data.DataLoader(eval_dataset, batch_size=self.conf["batch_size"], shuffle=True)'''定义模型聚合函数。前面我们提到服务端的主要功能是进行模型的聚合,因此定义构造函数后,我们需要在类中定义模型聚合函数,通过接收客户端上传的模型,使用聚合函数更新全局模型。聚合方案有很多种,本节我们采用经典的FedAvg算法。'''# 全局聚合模型# weight_accumulator 存储了每一个客户端的上传参数变化值/差值def model_aggregate(self, weight_accumulator):# 遍历服务器的全局模型for name, data in self.global_model.state_dict().items():# 更新每一层乘上学习率update_per_layer = weight_accumulator[name] * self.conf["lambda"]# 因为update_per_layer的type是floatTensor,所以将起转换为模型的LongTensor(有一定的精度损失)if data.type() != update_per_layer.type():data.add_(update_per_layer.to(torch.int64))else:data.add_(update_per_layer)'''定义模型评估函数。对当前的全局模型,利用评估数据评估当前的全局模型性能。通常情况下,服务端的评估函数主要对当前聚合后的全局模型进行分析,用于判断当前的模型训练是需要进行下一轮迭代、还是提前终止,或者模型是否出现发散退化的现象。根据不同的结果,服务端可以采取不同的措施策略。'''def model_eval(self):self.global_model.eval()total_loss = 0.0correct = 0dataset_size = 0for batch_id, batch in enumerate(self.eval_loader):data, target = batchdataset_size += data.size()[0]if torch.cuda.is_available():data = data.cuda()target = target.cuda()output = self.global_model(data)total_loss += torch.nn.functional.cross_entropy(output, target,reduction='sum').item()  # sum up batch losspred = output.data.max(1)[1]  # get the index of the max log-probabilitycorrect += pred.eq(target.data.view_as(pred)).cpu().sum().item()acc = 100.0 * (float(correct) / float(dataset_size))total_l = total_loss / dataset_sizereturn acc, total_l

在这里插入图片描述client.py

'''
-*- coding: utf-8 -*-
@Time    : 2023/11/23 17:33
@Author  : ihan
@File    : FederateAI
@Software: PyCharm
''''''
横向联邦学习的客户端主要功能是接收服务端的下发指令和全局模型,利用本地数据进行局部模型训练。'''import models, torch, copyclass Client(object):'''定义构造函数。在客户端构造函数中,客户端的主要工作包括:首先,将配置信息拷贝到客户端中;然后,按照配置中的模型信息获取模型,通常由服务端将模型参数传递给客户端,客户端将该全局模型覆盖掉本地模型;最后,配置本地训练数据,在本案例中,我们通过torchvision 的datasets 模块获取cifar10数据集后按客户端ID切分,不同的客户端拥有不同的子数据集,相互之间没有交集。'''def __init__(self, conf, model, train_dataset, id=-1):self.conf = confself.local_model = models.get_model(self.conf["model_name"])self.client_id = idself.train_dataset = train_datasetall_range = list(range(len(self.train_dataset)))data_len = int(len(self.train_dataset) / self.conf['no_models'])train_indices = all_range[id * data_len: (id + 1) * data_len]self.train_loader = torch.utils.data.DataLoader(self.train_dataset, batch_size=conf["batch_size"],sampler=torch.utils.data.sampler.SubsetRandomSampler(train_indices))'''定义模型本地训练函数。本例是一个图像分类的例子,因此,我们使用交叉熵作为本地模型的损失函数,利用梯度下降来求解并更新参数值'''def local_train(self, model):for name, param in model.state_dict().items():self.local_model.state_dict()[name].copy_(param.clone())# print(id(model))optimizer = torch.optim.SGD(self.local_model.parameters(), lr=self.conf['lr'],momentum=self.conf['momentum'])# print(id(self.local_model))self.local_model.train()for e in range(self.conf["local_epochs"]):for batch_id, batch in enumerate(self.train_loader):data, target = batchif torch.cuda.is_available():data = data.cuda()target = target.cuda()optimizer.zero_grad()output = self.local_model(data)loss = torch.nn.functional.cross_entropy(output, target)loss.backward()optimizer.step()print("Epoch %d done." % e)diff = dict()for name, data in self.local_model.state_dict().items():diff[name] = (data - model.state_dict()[name])# print(diff[name])return diff

main.py

'''
-*- coding: utf-8 -*-
@Time    : 2023/11/23 17:33
@Author  : ihan
@File    : FederateAI
@Software: PyCharm
'''
import argparse, json
import datetime
import os
import logging
import torch, random
import ihanfrom server import *
from client import *
import models, datasetsif __name__ == '__main__':parser = argparse.ArgumentParser(description='Federated Learning')parser.add_argument('-c', '--conf', dest='conf')args = parser.parse_args()#读取配置文件信息。with open(args.conf, 'r') as f:conf = json.load(f)ihan.saveIn(conf["model_name"],conf["no_models"],conf["type"],conf["global_epochs"],conf["local_epochs"],conf["k"])#获取数据集train_datasets, eval_datasets = datasets.get_dataset("./data/", conf["type"])#分别定义一个服务端对象和多个客户端对象,用来模拟横向联邦训练场景。server = Server(conf, eval_datasets)clients = []for c in range(conf["no_models"]):clients.append(Client(conf, server.global_model, train_datasets, c))print("\n\n")#每一轮的迭代,服务端会从当前的客户端集合中随机挑选一部分参与本轮迭代训练,#被选中的客户端调用本地训练接口local_train进行本地训练,#最后服务端调用模型聚合函数model_aggregate来更新全局模型for e in range(conf["global_epochs"]):candidates = random.sample(clients, conf["k"])weight_accumulator = {}for name, params in server.global_model.state_dict().items():weight_accumulator[name] = torch.zeros_like(params)for c in candidates:diff = c.local_train(server.global_model)for name, params in server.global_model.state_dict().items():weight_accumulator[name].add_(diff[name])server.model_aggregate(weight_accumulator)acc, loss = server.model_eval()print("Epoch %d, acc: %f, loss: %f\n" % (e, acc, loss))ihan.savePara(e,acc,loss)

ihan.py 用于输出实验结果,写入txt

'''
-*- coding: utf-8 -*-
@Time    : 2023/12/25 22:53
@Author  : ihan
@File    : FederateAI
@Software: PyCharm
'''from datetime import datetime
import time		# python里的日期模块def savePara(e,acc,loss):date = time.strftime('%Y-%m-%d %H:%M:%S').split()		# 按空格分开,这里宫格是我自己添加的print(date)     # ['2021-07-22', '16:11:00']hour_minute = date[1].split(':')print(hour_minute)      # 时,分,秒filepath = "/home/"+ date[0]+'.txt'output = "%s:Epoch %d  training accuracy :  %g, train Loss : %f " % (datetime.now(), e, acc, loss)with open(filepath,"a+") as f:f.write(output+'\n')f.close
def saveIn(model_name,no_models,date_type,global_epochs,local_epochs,current_clients):date = time.strftime('%Y-%m-%d %H:%M:%S').split()		# 按空格分开,这里宫格是我自己添加的print(date)     # ['2021-07-22', '16:11:00']hour_minute = date[1].split(':')print(hour_minute)      # 时,分,秒filepath = "/home/"+ date[0]+'.txt'output="%s:model_name %s,no_models %d,date_type %s,global_epochs %d,local_epochs %d,current_clients %d start"\% (datetime.now(), model_name,no_models,date_type,global_epochs,local_epochs,current_clients)with open(filepath, "a+") as f:f.write(output + '\n')f.close

在这里插入图片描述
关键代码解释
更新每一层乘上lambda=1/k

update_per_layer = weight_accumulator[name] * self.conf["lambda"]
data.add_(update_per_layer.to(torch.int64))

在这里插入图片描述weight_accumulator[name]代表
在这里插入图片描述

计算weight_accumulator[name]

for name, params in server.global_model.state_dict().items():weight_accumulator[name].add_(diff[name])   

需要先计算diff[name]

for name, data in self.local_model.state_dict().items():diff[name] = (data - model.state_dict()[name])

这段代码是在遍历self.local_model的状态字典,并计算本地模型参数与全局模型参数之间的差异,将差异存储在diff字典中。

具体来说,对于本地模型中的每个参数,通过访问全局模型的状态字典model.state_dict()来获取对应参数的数值,然后计算本地模型参数与全局模型参数之间的差异,并将这个差异存储在diff字典中,以参数的名称作为键。

这个操作的目的可能是为了计算本地模型相对于全局模型的参数更新量。在一些分布式或异步更新的训练算法中,本地模型通常是在单个机器上训练得到的,而全局模型则是由多个机器上的模型进行聚合得到的。为了将本地模型的参数更新同步到全局模型,需要计算本地模型参数与全局模型参数之间的差异,并将这个差异用于更新全局模型的参数。

完成

server.model_aggregate(weight_accumulator)

在这里插入图片描述


http://www.ppmy.cn/news/1465953.html

相关文章

设计模式(简要,应付软考)

简单工厂模式(Simple Factory Pattern): 又称为静态工厂方法(Static Factory Method)模式,它属于类创建型模式。在简单工厂模式中,可以根据参数的不同返回不同类的实例。简单工厂模式专门定义一个类来负责创建其他类的实例,被创建的实例通常都具有共同的父类。 单例实例&#…

Vue3-Ref Reactive toRef toRefs对比学习、标签ref与组件ref

响应式数据: Ref 作用:定义响应式变量。 语法:let xxx ref(初始值)(里面可以是任何规定内类型、数组等)。 返回值:一个RefImpl的实例对象,简称ref对象或ref,ref对象的value属性是响应式的。 注意点&am…

基础—SQL—图形化界面工具的DataGrip使用(2)

一、回顾与引言 (1) 上次内容,博客讲到了DDL语句的数据库操作、表操作、表字段的操作的相关语法,然而之前都是在MySQL的命令行当中去操作演示的。这种方式可以用,但是使用的话,第一,在我们日常…

npm发布、更新、删除包

如何将自己开发的依赖包发布到npmjs上供别人使用?五个步骤搞定! 实现步骤: 创建自己的工具包项目,进行开发。注册npmjs账号。执行npm login在控制台登录,填写用户信息。执行npm publish发布包。更新及删除。 步骤一…

Rust语言实现的去中心化AI网络节点

一、概述 去中心化和人工智能(AI)是两个极具潜力的发展方向。Gaia项目正是将这两者结合起来,创造了一个去中心化的AI网络节点。本文将深入探讨Gaia项目的技术细节,通过丰富的示例和详细描述,帮助读者全面理解并掌握该…

神经网络算法详解与前沿探索

神经网络算法详解与前沿探索 随着人工智能技术的迅猛发展,神经网络成为机器学习领域的重要组成部分,广泛应用于图像识别、自然语言处理和推荐系统等。本文将详细探讨神经网络的基本原理、结构、训练过程及其应用实例,并扩展至更多相关领域和…

牛客热题:数组中出现一次的两个数字

📟作者主页:慢热的陕西人 🌴专栏链接:力扣刷题日记 📣欢迎各位大佬👍点赞🔥关注🚓收藏,🍉留言 文章目录 牛客热题:数组中出现一次的两个数字题目链…

基于飞书机器人跨账号消息提醒

事情的起因是飞书中不同的账号不能同时登录,虽然可以在飞书的账号切换页面看到其他账号下是否有消息提醒(小红点),但是无法实现提醒功能,很不优雅,因此本文尝试提出一种新的方式实现不同账号之间的提醒功能…