强化学习_06_pytorch-TD3实践(BipedalWalkerHardcore-v3)

news/2025/2/19 14:48:29/

基于策略的离线算法TD3

1.1 简介

reference: openai-TD3

DDPG的critic会高估, 从而导致actor策略失败。TD3是增加了三个关键技巧优化DDPG。经过优化后的TD3(Twin Dalayed DDPG 双延迟深度确定性策略梯度算法)适合于具有高维连续动作空间的任务。

Tricks:

  • Clipped Double Q-learning: critic中有两个Q-net, 每次产出2个Q值,使用其中小的
    • Q t r a g e t = m i n ( Q 1 , Q 2 ) Q_{traget}=min(Q1, Q2) Qtraget=min(Q1,Q2); Q t r a g e t = r e w a r d + ( 1.0 − d o n e ) ∗ γ ∗ Q t r a g e t Q_{traget} = reward + (1.0 - done) * \gamma * Q_{traget} Qtraget=reward+(1.0done)γQtraget
    • 需要注意这时候 c r i t i c L o s s = M S E ( Q 1 c u r , Q t r a g e t ) + M S E ( Q 2 c u r , Q t r a g e t ) criticLoss = MSE(Q1_{cur}, Q_{traget}) + MSE(Q2_{cur}, Q_{traget}) criticLoss=MSE(Q1cur,Qtraget)+MSE(Q2cur,Qtraget) 对 $ Q1_{cur}, Q2_{cur} $ 求偏导
  • Delayed Policy Update: actor的更新频率要小于critic(当前的actor参数可以产出更多样本)。
    • 论文中建议one policy for every two Q-funtion updates.
  • Target Policy Smoothing: 在 target-actor输出的action中增加noise (本质上是正则化)
    • 从而可以更充分的理解游戏空间,使得网络更健壮。
    • a ′ ( s ′ ) = c l i p ( U θ t ( s ′ ) + c l i p ( ϵ , − c , c ) , a L , a H ) a'(s') = clip(U_{\theta_{t}}(s') + clip(\epsilon, -c, c), a_{L}, a_{H}) a(s)=clip(Uθt(s)+clip(ϵ,c,c),aL,aH)

1.2 Pytorch实践

Actor

策略网络(Actor)直接输出确定性action, 这里我们的激活函数都是用的nn.Tanh()

class DT3PolicyNet(nn.Module):"""输入state, 输出action"""def __init__(self, state_dim: int, hidden_layers_dim: typ.List, action_dim: int, action_bound: float=1.0):super(DT3PolicyNet, self).__init__()self.action_bound = action_boundself.features = nn.ModuleList()for idx, h in enumerate(hidden_layers_dim):self.features.append(nn.ModuleDict({'linear': nn.Linear(hidden_layers_dim[idx-1] if idx else state_dim, h),'linear_action': nn.Tanh()}))self.fc_out = nn.Linear(hidden_layers_dim[-1], action_dim)def forward(self, x):for layer in self.features:x = layer['linear_action'](layer['linear'](x))return torch.tanh(self.fc_out(x)) * self.action_bound

DQN输入state返回所有action的价值,再用max选取action
TD3的valueNet基本和DDPG一样,就多了一个QNet, 均是直接以state和action为入参,输出价值

Criticor


class TD3ValueNet(nn.Module):"""输入[state, cation], 输出value"""def __init__(self, state_action_dim: int, hidden_layers_dim: typ.List):super(TD3ValueNet, self).__init__()self.features_q1 = nn.ModuleList()self.features_q2 = nn.ModuleList()for idx, h in enumerate(hidden_layers_dim):self.features_q1.append(nn.ModuleDict({'linear': nn.Linear(hidden_layers_dim[idx-1] if idx else state_action_dim, h),'linear_activation': nn.ReLU(inplace=True)}))self.features_q2.append(nn.ModuleDict({'linear': nn.Linear(hidden_layers_dim[idx-1] if idx else state_action_dim, h),'linear_activation': nn.ReLU(inplace=True)}))self.head_q1 = nn.Linear(hidden_layers_dim[-1] , 1)self.head_q2 = nn.Linear(hidden_layers_dim[-1] , 1)def forward(self, state, action):x = torch.cat([state, action], dim=1).float() # 拼接状态和动作x1 = xx2 = xfor layer1, layer2 in zip(self.features_q1, self.features_q2):x1 = layer1['linear_activation'](layer1['linear'](x1))x2 = layer2['linear_activation'](layer2['linear'](x2))return self.head_q1(x1), self.head_q2(x2)def Q1(self, state, action):x = torch.cat([state, action], dim=1).float() # 拼接状态和动作for layer in self.features_q1:x = layer['linear_activation'](layer['linear'](x))return self.head_q1(x) 

Agent

这里我们要注意探索输出的noise和Tick3(Target Policy Smoothing)中的noise

  • 探索输出的noise:
    • 作用: 增大探索的空间
    • action_noise = np.random.normal(loc=0, scale=self.max_action * self.train_noise, size=self.action_dim)
    • 输出: act.detach().numpy()[0] + action_noise).clip(self.action_low, self.action_high
  • Target Policy Smoothing
    • policy_noise: policy_noise = TD3_kwargs.get('policy_noise', 0.2) * self.max_action
    • noise_clip: 限制最大noise policy_noise_clip = TD3_kwargs.get('policy_noise_clip', 0.5) * self.max_action
    @torch.no_grad()def smooth_action(self, state):act_target = self.target_actor(state)noise = (torch.randn(act_target.shape).float() *self.policy_noise).clip(-self.noise_clip, self.noise_clip)smoothed_target_a = (act_target + noise).clip(-self.max_action, self.max_action)return smoothed_target_a

TD3智能体脚本
在这里插入图片描述

from ._base_net import TD3ValueNet as valueNet
from ._base_net import DT3PolicyNet as policyNetclass TD3:def __init__(self,state_dim: int, actor_hidden_layers_dim: typ.List, critic_hidden_layers_dim: typ.List, action_dim: int,actor_lr: float,critic_lr: float,gamma: float,TD3_kwargs: typ.Dict,device: torch.device=None):"""state_dim (int): 环境的sate维度  actor_hidden_layers_dim (typ.List): actor hidden layer 维度  critic_hidden_layers_dim (typ.List): critic hidden layer 维度  action_dim (int): action的维度  actor_lr (float): actor学习率  critic_lr (float): critic学习率  gamma (float): 折扣率  TD3_kwargs (typ.Dict): TD3算法的三个trick的输入  example:  TD3_kwargs={  'action_low': env.action_space.low[0],  'action_high': env.action_space.high[0],  - soft update parameters  'tau': 0.005,   - trick2: Target Policy Smoothing  'delay_freq': 1,  - trick3: Target Policy Smoothing  'policy_noise': 0.2,  'policy_noise_clip': 0.5,  - exploration noise  'expl_noise': 0.25,  -  探索的 noise 指数系数率减少 noise = expl_noise * expl_noise_exp_reduce_factor^t  'expl_noise_exp_reduce_factor': 0.999  }  device (torch.device): 运行的device  """if device is None:device = torch.device('cpu')self.device = deviceself.action_low = TD3_kwargs.get('action_low', -1.0)self.action_high = TD3_kwargs.get('action_high', 1.0)self.max_action = max(abs(self.action_low), abs(self.action_high))self.actor = policyNet(state_dim, actor_hidden_layers_dim, action_dim, action_bound = self.max_action)self.critic = valueNet(state_dim + action_dim, critic_hidden_layers_dim)self.target_actor = copy.deepcopy(self.actor)self.target_critic = copy.deepcopy(self.critic)self.actor.to(device)self.critic.to(device)self.target_actor.to(device)self.target_critic.to(device)self.actor_opt = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)self.critic_opt = torch.optim.Adam(self.critic.parameters(), lr=critic_lr)self.gamma = gammaself.action_dim = action_dimself.tau = TD3_kwargs.get('tau', 0.01)self.policy_noise = TD3_kwargs.get('policy_noise', 0.2) * self.max_action self.policy_noise_clip = TD3_kwargs.get('policy_noise_clip', 0.5) * self.max_action self.expl_noise = TD3_kwargs.get('expl_noise', 0.25)self.expl_noise_exp_reduce_factor = TD3_kwargs.get('expl_noise_exp_reduce_factor', 1)self.delay_counter = -1# actor延迟更新的频率: 论文建议critic更新2次, actor更新1次, 即延迟1次self.delay_freq = TD3_kwargs.get('delay_freq', 1)# Normal sigmaself.train = Falseself.train_noise = self.expl_noise@torch.no_grad()def smooth_action(self, state):"""trick3: Target Policy Smoothing在target-actor输出的action中增加noise"""act_target = self.target_actor(state)noise = (torch.randn(act_target.shape).float() *self.policy_noise).clip(-self.policy_noise_clip, self.policy_noise_clip)smoothed_target_a = (act_target + noise).clip(self.action_low, self.action_high)return smoothed_target_a@torch.no_grad()def policy(self, state):state = torch.FloatTensor([state]).to(self.device)act = self.actor(state)if self.train:action_noise = np.random.normal(loc=0, scale=self.max_action * self.train_noise, size=self.action_dim)self.train_noise *= self.expl_noise_exp_reduce_factorreturn (act.detach().numpy()[0] + action_noise).clip(self.action_low, self.action_high)return act.detach().numpy()[0]def soft_update(self, net, target_net):for param_target, param in zip(target_net.parameters(), net.parameters()):param_target.data.copy_(param_target.data * (1 - self.tau) + param.data * self.tau)def update(self, samples):self.delay_counter += 1state, action, reward, next_state, done = zip(*samples)state = torch.FloatTensor(state).to(self.device)action = torch.tensor(action).to(self.device)reward = torch.tensor(reward).view(-1, 1).to(self.device)next_state = torch.FloatTensor(next_state).to(self.device)done = torch.FloatTensor(done).view(-1, 1).to(self.device)# 计算目标Qsmooth_act = self.smooth_action(state)# trick1: **Clipped Double Q-learning**: critic中有两个`Q-net`, 每次产出2个Q值,使用其中小的target_Q1, target_Q2 = self.target_critic(next_state, smooth_act)target_Q = torch.minimum(target_Q1, target_Q2)target_Q = reward + (1.0 - done) * self.gamma * target_Q# 计算当前Q值current_Q1, current_Q2 = self.critic(state, action)q_loss = F.mse_loss(current_Q1.float(), target_Q.float().detach()) + F.mse_loss(current_Q2.float(), target_Q.float().detach())self.critic_opt.zero_grad()q_loss.backward()self.critic_opt.step()# trick2: **Delayed Policy Update**: actor的更新频率要小于critic(当前的actor参数可以产出更多样本)。if self.delay_counter == self.delay_freq:# actor 延迟updateac_action = self.actor(state)actor_loss = -torch.mean(self.critic.Q1(state, ac_action))self.actor_opt.zero_grad()actor_loss.backward()self.actor_opt.step()self.soft_update(self.actor, self.target_actor)self.soft_update(self.critic, self.target_critic)self.delay_counter = -1def save(self, file_path):act_f = os.path.join(file_path, 'TD3_actor.ckpt')critic_f = os.path.join(file_path, 'TD3_actor.ckpt')torch.save(self.actor.state_dict(), act_f)torch.save(self.q_critic.state_dict(), critic_f)def load(self, file_path):act_f = os.path.join(file_path, 'TD3_actor.ckpt')critic_f = os.path.join(file_path, 'TD3_actor.ckpt')self.actor.set_state_dict(torch.load(act_f))self.q_critic.set_state_dict(torch.load(critic_f))

1.2.2 智能体训练

整体训练脚本可以看笔者的github test_TD3

注意设置tau要设置小一些


def BipedalWalkerHardcore_TD3_test():"""policyNet: valueNet: """env_name = 'BipedalWalkerHardcore-v3'gym_env_desc(env_name)env = gym.make(env_name)print("gym.__version__ = ", gym.__version__ )path_ = os.path.dirname(__file__)cfg = Config(env, # 环境参数save_path=os.path.join(path_, "test_models" ,'TD3_BipedalWalkerHardcore-v3_test_actor-3.ckpt'), seed=42,# 网络参数actor_hidden_layers_dim=[200, 200],critic_hidden_layers_dim=[200, 200],# agent参数actor_lr=1e-4,critic_lr=3e-4,gamma=0.99,# 训练参数num_episode=5000,sample_size=256,# 环境复杂多变,需要保存多一些bufferoff_buffer_size=int(1e6),off_minimal_size=2048,max_episode_rewards=1000,max_episode_steps=1000,# agent 其他参数TD3_kwargs={'action_low': env.action_space.low[0],'action_high': env.action_space.high[0],# soft update parameters'tau': 0.005, # trick2: Delayed Policy Update'delay_freq': 1,# trick3: Target Policy Smoothing'policy_noise': 0.2,'policy_noise_clip': 0.5,# exploration noise'expl_noise': 0.25,# 探索的 noise 指数系数率减少 noise = expl_noise * expl_noise_exp_reduce_factor^t'expl_noise_exp_reduce_factor': 0.999})agent = TD3(state_dim=cfg.state_dim,actor_hidden_layers_dim=cfg.actor_hidden_layers_dim,critic_hidden_layers_dim=cfg.critic_hidden_layers_dim,action_dim=cfg.action_dim,actor_lr=cfg.actor_lr,critic_lr=cfg.critic_lr,gamma=cfg.gamma,TD3_kwargs=cfg.TD3_kwargs,device=cfg.device)agent.train = Truetrain_off_policy(env, agent, cfg, done_add=False, reward_func=reward_func)agent.actor.load_state_dict(torch.load(cfg.save_path))play(gym.make(env_name, render_mode='human'), agent, cfg, episode_count=2)if __name__ == '__main__':BipedalWalkerHardcore_TD3_test()

1.2.3 训练出的智能体观测

最后将训练的最好的网络拿出来进行观察
由于环境较为复杂,且笔者用CPU训练的所以在有的复杂随机图上表现还是有欠缺。

env_name = 'BipedalWalkerHardcore-v3'
agent.actor.load_state_dict(torch.load(cfg.save_path)
)
play(gym.make(env_name, render_mode='human'), agent, cfg, episode_count=2)

在这里插入图片描述


http://www.ppmy.cn/news/61373.html

相关文章

java调用cmd命令

1.首先,我们需要了解一下 java是如何调用 cmd的: 6.在实际的开发中,我们有可能会遇到 java调用 cmd命令的情况: 7.对于一些特定的环境下,例如在嵌入式系统中,那么我们可以使用下面这种方式来调用 cmd命令&a…

搭建Linux依赖环境

目录 1、jdk(基于yum进行安装 ) 2、Tomcat (手动下载安装) 3、MariaDB(基于yum进行安装) 安装 启动 测试连接 1、jdk(基于yum进行安装 ) 可以使用yum list | grep [关键词…

3 ROS2服务通讯基础

ROS2服务通讯基础 3.1 服务通讯介绍3.2 ROS2服务通讯的基本流程3.2.1 创建ROS2服务通讯功能包的基本流程3.2.2 创建ROS2服务通讯功能包示例 3.3 使用C/C实现ROS2服务通讯3.3.1 创建C/C服务通讯服务端功能包并编写节点文件3.3.2 配置C/C服务通讯服务端功能包3.3.3 编译并运行C/C…

基于matlab的长短期神经网络lstm的股票预测

目录 背影 摘要 LSTM的基本定义 LSTM实现的步骤 基于长短期神经网络LSTM的股票预测 MATALB编程实现,附有代码,及链接基于matlab编程的的长短期神经网络LSTM的股票价格的预测,基于深度学习神经网络的股票价格预测-深度学习文档类资源-CSDN文库…

数据结构(六)—— 二叉树(4)回溯

文章目录 一、题1 257 二叉树的所有路径1.1 写法11.2 写法2 一、题 1 257 二叉树的所有路径 1.1 写法1 递归回溯:回溯是递归的副产品,只要有递归就会有回溯 首先考虑深度优先搜索;而题目要求从根节点到叶子的路径,所以需要前序…

Docker安装常用软件-Nacos

一、单机部署 官方网站:什么是 Nacos 1、下载最新nacos镜像 docker pull nacos/nacos-server 2、新建映射文件夹 --nacos/conf/application.properties --nacos/logs --nacos/sql ①application文件 # # Copyright 1999-2021 Alibaba Group Holding Ltd. #…

Python机器学习入门 -- 支持向量机学习笔记

文章目录 前言一、支持向量机简介二、支持向量机的数学原理1. 距离解算2. 目标函数3. 约束下的优化求解4. 软间隔优化5. 核函数变换 三、Python实现支持向量机1. 惩罚力度对比2. 高斯核函数3. 非线性SVM 总结 前言 大部分传统的机器学习算法都可以实现分类任务,但这…

领先的项目协作管理软件OpenProject

本文软件由网友 不长到一百四誓不改名 推荐; 什么是 OpenProject ? OpenProject 是一个开源、基于 Web 的项目管理系统,提供了免费的社区版和收费的企业版。OpenProject 拥有完善的文档,API,及丰富的功能,可…