PyTorch 深度学习实战(13):Proximal Policy Optimization (PPO) 算法

news/2025/3/19 20:18:04/

在上一篇文章中,我们介绍了 Actor-Critic 算法,并使用它解决了 CartPole 问题。本文将深入探讨 Proximal Policy Optimization (PPO) 算法,这是一种更稳定、更高效的策略优化方法。我们将使用 PyTorch 实现 PPO 算法,并应用于经典的 CartPole 问题。


一、PPO 算法基础

PPO 是 OpenAI 提出的一种强化学习算法,旨在解决策略梯度方法中的训练不稳定问题。PPO 通过限制策略更新的幅度,确保每次更新不会偏离当前策略太远,从而提高训练的稳定性。

1. PPO 的核心思想

  • 重要性采样

    • PPO 使用重要性采样(Importance Sampling)来估计新策略的期望回报,从而避免重新采样。

  • 裁剪目标函数

    • PPO 通过裁剪策略更新的幅度,确保新策略不会偏离旧策略太远。

  • 优势函数

    • PPO 使用优势函数(Advantage Function)来评估动作的好坏,从而更高效地更新策略。

2. PPO 的优势

  • 训练稳定

    • 通过限制策略更新的幅度,PPO 避免了策略梯度方法中的训练不稳定问题。

  • 高效采样

    • PPO 可以重复使用旧策略的采样数据,从而提高数据利用率。

  • 适用范围广

    • PPO 可以应用于连续动作空间和离散动作空间的问题。

3. PPO 的算法流程

  1. 使用当前策略采样一批数据。

  2. 计算优势函数和旧策略的概率。

  3. 通过裁剪目标函数更新策略。

  4. 重复上述过程,直到策略收敛。


二、CartPole 问题实战

我们将使用 PyTorch 实现 PPO 算法,并应用于 CartPole 问题。目标是控制小车使其上的杆子保持直立。

1. 问题描述

CartPole 环境的状态空间包括小车的位置、速度、杆子的角度和角速度。动作空间包括向左或向右移动小车。智能体每保持杆子直立一步,就会获得 +1 的奖励,当杆子倾斜超过一定角度或小车移动超出范围时,游戏结束。

2. 实现步骤

  1. 安装并导入必要的库。

  2. 定义策略网络和价值网络。

  3. 定义 PPO 训练过程。

  4. 测试模型并评估性能。

3. 代码实现

以下是完整的代码实现:

import gym
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import random
import matplotlib.pyplot as plt
from torch.optim.lr_scheduler import StepLR
​
# 可视化设置
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
​
# 环境初始化
env = gym.make('CartPole-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
​
# 随机种子设置
SEED = 42
# env.seed(SEED) 
torch.manual_seed(SEED)
np.random.seed(SEED)
random.seed(SEED)
​
# 增强型策略网络
class PolicyNetwork(nn.Module):def __init__(self, state_size, action_size):super().__init__()self.net = nn.Sequential(nn.Linear(state_size, 128),nn.ReLU(),nn.Linear(128, 128),nn.ReLU(),nn.Linear(128, 64),nn.ReLU(),nn.Linear(64, action_size))self._initialize_weights()
​def _initialize_weights(self):for m in self.modules():if isinstance(m, nn.Linear):nn.init.orthogonal_(m.weight, gain=np.sqrt(2))nn.init.constant_(m.bias, 0.0)
​def forward(self, x):return self.net(x)
​
​
# 增强型价值网络
class ValueNetwork(nn.Module):def __init__(self, state_size):super().__init__()self.net = nn.Sequential(nn.Linear(state_size, 128),nn.ReLU(),nn.Linear(128, 128),nn.ReLU(),nn.Linear(128, 64),nn.ReLU(),nn.Linear(64, 1))self._initialize_weights()
​def _initialize_weights(self):for m in self.modules():if isinstance(m, nn.Linear):nn.init.orthogonal_(m.weight, gain=1.0)nn.init.constant_(m.bias, 0.0)
​def forward(self, x):return self.net(x)
​
​
class PPO:def __init__(self, state_dim, action_dim):# 超参数设置self.gamma = 0.995self.gae_lambda = 0.97self.clip_ratio = 0.15self.update_epochs = 4self.norm_adv = True
​# 网络初始化self.actor = PolicyNetwork(state_dim, action_dim)self.critic = ValueNetwork(state_dim)
​# 优化器设置self.actor_optim = optim.Adam(self.actor.parameters(), lr=2e-4)self.critic_optim = optim.Adam(self.critic.parameters(), lr=8e-4)
​# 学习率调度self.actor_scheduler = StepLR(self.actor_optim, step_size=100, gamma=0.95)self.critic_scheduler = StepLR(self.critic_optim, step_size=100, gamma=0.95)
​def get_action(self, state):# state_tensor = torch.FloatTensor(state) # 如果使用 Gym <0.26state_tensor = torch.FloatTensor(state).unsqueeze(0)  # 如果使用 Gym >=0.26with torch.no_grad():logits = self.actor(state_tensor)probs = torch.softmax(logits, dim=-1)action = torch.multinomial(probs, 1).item()return action
​def compute_gae(self, rewards, values, next_values, dones):advantages = np.zeros_like(rewards)last_gae = 0for t in reversed(range(len(rewards))):if dones[t]:delta = rewards[t] - values[t]last_gae = deltaelse:delta = rewards[t] + self.gamma * next_values[t] - values[t]last_gae = delta + self.gamma * self.gae_lambda * last_gaeadvantages[t] = last_gaereturn torch.FloatTensor(advantages)
​def update(self, states, actions, rewards, next_states, dones):# 转换为张量states = torch.FloatTensor(np.array(states))actions = torch.LongTensor(np.array(actions))rewards = torch.FloatTensor(np.array(rewards))next_states = torch.FloatTensor(np.array(next_states))dones = torch.BoolTensor(np.array(dones))
​# 计算价值估计with torch.no_grad():current_values = self.critic(states).squeeze()next_values = self.critic(next_states).squeeze()next_values[dones] = 0.0
​# 计算GAEadvantages = self.compute_gae(rewards.numpy(),current_values.numpy(),next_values.numpy(),dones.numpy())
​# 标准化优势if self.norm_adv:advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
​# 获取旧策略概率with torch.no_grad():old_logits = self.actor(states)old_probs = torch.softmax(old_logits, dim=-1)old_log_probs = torch.log(old_probs.gather(1, actions.unsqueeze(1))).squeeze()
​# 策略优化for _ in range(self.update_epochs):logits = self.actor(states)probs = torch.softmax(logits, dim=-1)new_log_probs = torch.log(probs.gather(1, actions.unsqueeze(1))).squeeze()
​ratios = torch.exp(new_log_probs - old_log_probs)clipped_ratios = torch.clamp(ratios, 1 - self.clip_ratio, 1 + self.clip_ratio)
​policy_loss = -torch.min(ratios * advantages, clipped_ratios * advantages).mean()
​self.actor_optim.zero_grad()policy_loss.backward()torch.nn.utils.clip_grad_norm_(self.actor.parameters(), 0.6)self.actor_optim.step()
​# 价值优化for _ in range(self.update_epochs):current_values = self.critic(states).squeeze()target_values = rewards + self.gamma * next_valuesvalue_loss = F.mse_loss(current_values, target_values)
​self.critic_optim.zero_grad()value_loss.backward()torch.nn.utils.clip_grad_norm_(self.critic.parameters(), 0.8)self.critic_optim.step()
​# 更新学习率self.actor_scheduler.step()self.critic_scheduler.step()
​
​
# 训练流程
def train_ppo(env, agent, episodes=800, early_stop=30):rewards_history = []moving_avg = []best_score = -np.infno_improve = 0
​for ep in range(episodes):# 重置环境时确保正确处理返回值# state = env.reset() # 如果使用 Gym <0.26state, _ = env.reset()  # 如果使用 Gym >=0.26episode_data = {'states': [],'actions': [],'rewards': [],'next_states': [],'dones': []}total_reward = 0done = False
​while not done:action = agent.get_action(state)# next_state, reward, done, _ = env.step(action) # 如果使用 Gym <0.26next_state, reward, done, _,_ = env.step(action) # 如果使用 Gym >=0.26
​# 处理环境提前终止if env._elapsed_steps >= env.spec.max_episode_steps:done = Truereward = 0
​# 存储轨迹episode_data['states'].append(state)episode_data['actions'].append(action)episode_data['rewards'].append(reward)episode_data['next_states'].append(next_state)episode_data['dones'].append(done)
​state = next_statetotal_reward += reward
​# 更新模型agent.update(**episode_data)
​# 记录训练进度rewards_history.append(total_reward)current_avg = np.mean(rewards_history[-50:])moving_avg.append(current_avg)
​# 早停机制if current_avg > best_score:best_score = current_avgno_improve = 0else:no_improve += 1
​if no_improve >= early_stop:print(f"早停触发于第{ep + 1}轮,最佳平均奖励: {best_score:.1f}")break
​# 进度报告if (ep + 1) % 50 == 0:print(f"Episode {ep + 1:3d} | 当前奖励: {total_reward:4.0f} | 平均奖励: {current_avg:6.1f}")
​return moving_avg, rewards_history
​
​
# 训练启动
ppo_agent = PPO(state_dim, action_dim)
moving_avg, rewards_history = train_ppo(env, ppo_agent)
# 可视化结果
plt.figure(figsize=(12, 6))
plt.plot(rewards_history, alpha=0.4, label='单轮奖励')
plt.plot(moving_avg, 'r-', linewidth=2, label='滑动平均(50轮)')
plt.xlabel('训练轮次')
plt.ylabel('奖励')
plt.title('PPO训练进程')
plt.legend()
plt.grid(True)
plt.show()

三、代码解析

  1. 策略网络和价值网络

    • 策略网络输出动作的 logits(未归一化的概率),通过 F.softmax 转换为概率分布。

    • 价值网络输出状态的价值估计。

  2. PPO 训练过程

    • 使用当前策略采样一批数据。

    • 计算优势函数和旧策略的概率。

    • 通过裁剪目标函数更新策略。

  3. 训练过程

    • 在训练过程中,每 50 个 episode 打印一次平均奖励。

    • 训练结束后,绘制训练过程中的总奖励曲线。


四、运行结果

运行上述代码后,你将看到以下输出:

  • 训练过程中每 50 个 episode 打印一次平均奖励。

  • 训练结束后,绘制训练过程中的总奖励曲线。

    Episode  50 | 当前奖励:   29 | 平均奖励:   20.8
    Episode 100 | 当前奖励:  132 | 平均奖励:   69.1
    Episode 150 | 当前奖励:  499 | 平均奖励:  261.6
    Episode 200 | 当前奖励:  295 | 平均奖励:  412.3
    Episode 250 | 当前奖励:  499 | 平均奖励:  481.8
    早停触发于第290轮,最佳平均奖励: 497.7


五、总结

本文介绍了 PPO 算法的基本原理,并使用 PyTorch 实现了一个简单的 PPO 模型来解决 CartPole 问题。通过这个例子,我们学习了如何使用 PPO 算法进行策略优化。

在下一篇文章中,我们将探讨更高级的强化学习算法,如 Deep Deterministic Policy Gradient (DDPG)。敬请期待!

代码实例说明

  • 本文代码可以直接在 Jupyter Notebook 或 Python 脚本中运行。

  • 如果你有 GPU,可以将模型和数据移动到 GPU 上运行,例如:actor = actor.to('cuda')state = state.to('cuda')

希望这篇文章能帮助你更好地理解 PPO 算法!如果有任何问题,欢迎在评论区留言讨论。


http://www.ppmy.cn/news/1580408.html

相关文章

Oraclelinux问题-/var/log/pcp/pmlogger/目录超大

有套19c rac环境&#xff0c;操作系统是oracle linux 8.10&#xff0c;日常巡检时发现/var/log/pcp/pmlogger/目录超大&#xff0c;如下 [rootdb1 ~]# du -sh /var/log/pcp/pmlogger/* 468G /var/log/pcp/pmlogger/db 1.3G /var/log/pcp/pmlogger/oracle06-106 754M /…

Python----计算机视觉处理(Opencv:图像颜色替换)

一、开运算 开运算就是对图像先进行腐蚀操作&#xff0c; 然后进行膨胀操作。开运算可以去除二值化图中的小的噪点&#xff0c;并分离相连的物体。 其主要目的就是消除那些小白点 在开运算组件中&#xff0c;有一个叫做kernel的参数&#xff0c;指的是核的大小&#xff0c;通常…

LeetCode56☞合并区间

关联LeetCode题号56 本题特点 贪心 本题思路 将二维数组排序按照左边界排序。排序后&#xff0c;右边界的大小成为找到局部最大值的关键。由题意合并区间可知&#xff0c;应该取数组的’并集‘&#xff0c;局部最优解推出全局最优解&#xff0c;每次找到局部最大的范围&am…

【软件工程】03_软件需求分析

3.1 系统分析 1. 系统分析概述 系统分析是一组统称为计算机系统工程的活动。它着眼于所有的系统元素&#xff0c;而非仅仅局限于软件。系统分析主要探索软件项目的目标、市场预期、主要的技术指标等&#xff0c;其目的在于帮助决策者做出是否进行软件项目立项的决定。 2. 可行…

Matlab 汽车二自由度转弯模型

1、内容简介 Matlab 187-汽车二自由度转弯模型 可以交流、咨询、答疑 2、内容说明 略 摘 要 本文前一部分提出了侧偏角和横摆角速度作为参数。描述了车辆运动的运动状态&#xff0c;其中文中使用的参考模型是二自由度汽车模型。汽车速度被认为是建立基于H.B.Pacejka的轮胎模…

UE5与U3D引擎对比分析

Unreal Engine 5&#xff08;UE5&#xff09;和Unity 3D&#xff08;U3D&#xff09;是两款主流的游戏引擎&#xff0c;适用于不同类型的项目开发。以下是它们的主要区别&#xff0c;分点整理&#xff1a; 1. 核心定位 UE5&#xff1a; 主打3A级高画质项目&#xff08;如主机/P…

A SURVEY ON POST-TRAINING OF LARGE LANGUAGE MODELS——大型语言模型的训练后优化综述——第8部分——数据

8 数据集 后训练技术被精心设计以提高LLMs对特定领域或任务的适应性&#xff0c;而数据集则是这一优化过程的基石。对先前研究[457, 82]的仔细审查强调了数据的质量、多样性和相关性如何深刻影响模型的有效性&#xff0c;并经常决定后训练努力的成功与否。为了阐明数据集在此背…

leetcode 75.颜色分类(荷兰国旗问题)

题目描述 题目分析 本题是经典的「荷兰国旗问题」&#xff0c;由计算机科学家 Edsger W. Dijkstra 首先提出。 要想单独解决这道题本身还是很简单的&#xff0c;统计0、1、2的数量然后按顺序赋值&#xff0c;或者手写一个冒泡排序&#xff0c;whatever。 但是在这一题中我们主…