每天学一点强化学习(二)

news/2025/1/7 22:27:55/

《动手学强化学习》模仿学习代码的修改

在这里插入图片描述

由于版本的不同,模仿学习中的代码需要有一些做出修改,

def test_agent(agent, env, n_episode):return_list = []for episode in range(n_episode):episode_return = 0state = env.reset()[0] # 修改位置done = Falsewhile not done:action = agent.take_action(state)next_state, reward, done, _ ,info= env.step(action) #修改的位置state = next_stateepisode_return += rewardreturn_list.append(episode_return)return np.mean(return_list)
class BehaviorClone:def __init__(self, state_dim, hidden_dim, action_dim, lr):self.policy = PolicyNet(state_dim, hidden_dim, action_dim).to(device)self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=lr)def learn(self, states, actions):states = torch.tensor(states, dtype=torch.float).to(device)actions = torch.tensor(actions, dtype=torch.long).view(-1, 1).to(device)  # Specify dtype=torch.long 类型需要进行转换log_probs = torch.log(self.policy(states).gather(1, actions))bc_loss = torch.mean(-log_probs)  # 最大似然估计 损失函数self.optimizer.zero_grad()bc_loss.backward()self.optimizer.step()

修改后的rl_utils代码:

from tqdm import tqdm
import numpy as np
import torch
import collections
import randomclass ReplayBuffer:def __init__(self, capacity):self.buffer = collections.deque(maxlen=capacity) def add(self, state, action, reward, next_state, done): self.buffer.append((state, action, reward, next_state, done)) def sample(self, batch_size): transitions = random.sample(self.buffer, batch_size)state, action, reward, next_state, done = zip(*transitions)return np.array(state), action, reward, np.array(next_state), done def size(self): return len(self.buffer)def moving_average(a, window_size):cumulative_sum = np.cumsum(np.insert(a, 0, 0)) middle = (cumulative_sum[window_size:] - cumulative_sum[:-window_size]) / window_sizer = np.arange(1, window_size-1, 2)begin = np.cumsum(a[:window_size-1])[::2] / rend = (np.cumsum(a[:-window_size:-1])[::2] / r)[::-1]return np.concatenate((begin, middle, end))def train_on_policy_agent(env, agent, num_episodes):return_list = []for i in range(10):with tqdm(total=int(num_episodes/10), desc='Iteration %d' % i) as pbar:for i_episode in range(int(num_episodes/10)):episode_return = 0transition_dict = {'states': [], 'actions': [], 'next_states': [], 'rewards': [], 'dones': []}state = env.reset()done = Falsewhile not done:action = agent.take_action(state)next_state, reward, done, _ = env.step(action)transition_dict['states'].append(state)transition_dict['actions'].append(action)transition_dict['next_states'].append(next_state)transition_dict['rewards'].append(reward)transition_dict['dones'].append(done)state = next_stateepisode_return += rewardreturn_list.append(episode_return)agent.update(transition_dict) # 每条轨迹更新一次 更新时存在很多的重复计算epoch次if (i_episode+1) % 10 == 0:pbar.set_postfix({'episode': '%d' % (num_episodes/10 * i + i_episode+1), 'return': '%.3f' % np.mean(return_list[-10:])})pbar.update(1)return return_listdef train_off_policy_agent(env, agent, num_episodes, replay_buffer, minimal_size, batch_size):return_list = []for i in range(10):with tqdm(total=int(num_episodes/10), desc='Iteration %d' % i) as pbar:for i_episode in range(int(num_episodes/10)):episode_return = 0state = env.reset()done = Falsewhile not done:action = agent.take_action(state)next_state, reward, done, _ = env.step(action)replay_buffer.add(state, action, reward, next_state, done)state = next_stateepisode_return += rewardif replay_buffer.size() > minimal_size:b_s, b_a, b_r, b_ns, b_d = replay_buffer.sample(batch_size)transition_dict = {'states': b_s, 'actions': b_a, 'next_states': b_ns, 'rewards': b_r, 'dones': b_d}agent.update(transition_dict)return_list.append(episode_return)if (i_episode+1) % 10 == 0:pbar.set_postfix({'episode': '%d' % (num_episodes/10 * i + i_episode+1), 'return': '%.3f' % np.mean(return_list[-10:])})pbar.update(1)return return_listdef compute_advantage(gamma, lmbda, td_delta):td_delta = td_delta.detach().numpy()advantage_list = []advantage = 0.0for delta in td_delta[::-1]:advantage = gamma * lmbda * advantage + deltaadvantage_list.append(advantage)advantage_list.reverse()return torch.tensor(advantage_list, dtype=torch.float)

模仿学习的完整代码,行为克隆方法

import gym
import torch
import torch.nn.functional as F
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import random
import rl_utilsclass PolicyNet(torch.nn.Module):def __init__(self, state_dim, hidden_dim, action_dim):super(PolicyNet, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc2 = torch.nn.Linear(hidden_dim, action_dim)def forward(self, x):x = F.relu(self.fc1(x))return F.softmax(self.fc2(x), dim=1)class ValueNet(torch.nn.Module):def __init__(self, state_dim, hidden_dim):super(ValueNet, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc2 = torch.nn.Linear(hidden_dim, 1)def forward(self, x):x = F.relu(self.fc1(x))return self.fc2(x)class PPO:''' PPO算法,采用截断方式 '''def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr,lmbda, epochs, eps, gamma, device):self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device)self.critic = ValueNet(state_dim, hidden_dim).to(device)self.actor_optimizer = torch.optim.Adam(self.actor.parameters(),lr=actor_lr)self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),lr=critic_lr)self.gamma = gammaself.lmbda = lmbdaself.epochs = epochs  # 一条序列的数据用于训练轮数self.eps = eps  # PPO中截断范围的参数self.device = devicedef take_action(self, state):state = torch.tensor([state], dtype=torch.float).to(self.device)probs = self.actor(state)action_dist = torch.distributions.Categorical(probs)action = action_dist.sample()return action.item()def update(self, transition_dict):states = torch.tensor(transition_dict['states'],dtype=torch.float).to(self.device)actions = torch.tensor(transition_dict['actions'],dtype=torch.long).view(-1, 1).to(self.device)rewards = torch.tensor(transition_dict['rewards'],dtype=torch.float).view(-1, 1).to(self.device)next_states = torch.tensor(transition_dict['next_states'],dtype=torch.float).to(self.device)dones = torch.tensor(transition_dict['dones'],dtype=torch.float).view(-1, 1).to(self.device)td_target = rewards + self.gamma * self.critic(next_states) * (1 -dones)td_delta = td_target - self.critic(states)advantage = rl_utils.compute_advantage(self.gamma, self.lmbda,td_delta.cpu()).to(self.device)old_log_probs = torch.log(self.actor(states).gather(1,actions)).detach()for _ in range(self.epochs):log_probs = torch.log(self.actor(states).gather(1, actions))ratio = torch.exp(log_probs - old_log_probs)surr1 = ratio * advantagesurr2 = torch.clamp(ratio, 1 - self.eps,1 + self.eps) * advantage  # 截断actor_loss = torch.mean(-torch.min(surr1, surr2))  # PPO损失函数critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach()))self.actor_optimizer.zero_grad()self.critic_optimizer.zero_grad()actor_loss.backward()critic_loss.backward()self.actor_optimizer.step()self.critic_optimizer.step()actor_lr = 1e-3
critic_lr = 1e-2
num_episodes = 50
hidden_dim = 128
gamma = 0.98
lmbda = 0.95
epochs = 10
eps = 0.2
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")env_name = 'CartPole-v0'
env = gym.make(env_name)
torch.manual_seed(0)
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
ppo_agent = PPO(state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda,epochs, eps, gamma, device)return_list = rl_utils.train_on_policy_agent(env, ppo_agent, num_episodes)def sample_expert_data(n_episode):states = []actions = []for episode in range(n_episode):state = env.reset()[0]done = Falsewhile not done:action = ppo_agent.take_action(state)states.append(state)actions.append(action)next_state, reward, done, _ ,info= env.step(action)state = next_statereturn np.array(states), np.array(actions)torch.manual_seed(0)
random.seed(0)
n_episode = 1
expert_s, expert_a = sample_expert_data(n_episode)n_samples = 30  # 采样30个数据
random_index = random.sample(range(expert_s.shape[0]), n_samples)
expert_s = expert_s[random_index]
expert_a = expert_a[random_index]class BehaviorClone:def __init__(self, state_dim, hidden_dim, action_dim, lr):self.policy = PolicyNet(state_dim, hidden_dim, action_dim).to(device)self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=lr)def learn(self, states, actions):states = torch.tensor(states, dtype=torch.float).to(device)actions = torch.tensor(actions, dtype=torch.long).view(-1, 1).to(device)  # Specify dtype=torch.longlog_probs = torch.log(self.policy(states).gather(1, actions))bc_loss = torch.mean(-log_probs)  # 最大似然估计 损失函数self.optimizer.zero_grad()bc_loss.backward()self.optimizer.step()def take_action(self, state):state = torch.tensor([state], dtype=torch.float).to(device)probs = self.policy(state)action_dist = torch.distributions.Categorical(probs)action = action_dist.sample()return action.item()def test_agent(agent, env, n_episode):return_list = []for episode in range(n_episode):episode_return = 0state = env.reset()[0]done = Falsewhile not done:action = agent.take_action(state)next_state, reward, done, _ ,info= env.step(action)state = next_stateepisode_return += rewardreturn_list.append(episode_return)return np.mean(return_list)torch.manual_seed(0)
np.random.seed(0)lr = 1e-3
bc_agent = BehaviorClone(state_dim, hidden_dim, action_dim, lr)
n_iterations = 1000
batch_size = 64
test_returns = []with tqdm(total=n_iterations, desc="进度条") as pbar:for i in range(n_iterations):sample_indices = np.random.randint(low=0,high=expert_s.shape[0],size=batch_size)bc_agent.learn(expert_s[sample_indices], expert_a[sample_indices])# action是专家数据current_return = test_agent(bc_agent, env, 5)test_returns.append(current_return)if (i + 1) % 10 == 0:pbar.set_postfix({'return': '%.3f' % np.mean(test_returns[-10:])})pbar.update(1)iteration_list = list(range(len(test_returns)))
plt.plot(iteration_list, test_returns)
plt.xlabel('Iterations')
plt.ylabel('Returns')
plt.title('BC on {}'.format(env_name))
plt.show()

强化学习资料

  1. 动手学强化学习:代码比较简单易懂
  2. 强化学习的数学原理:讲述强化学习原理,常看常新
  3. Spinning Up:适合新手入门,代码可以参考

http://www.ppmy.cn/news/1560980.html

相关文章

向量的导数

向量的导数 向量的导数取决于你对向量的上下文和所涉及的变量维度。常见情况下,我们主要讨论以下两种情况: 1. 标量函数对向量的导数 如果一个标量函数 f ( x ) f(\mathbf{x}) f(x) 是关于向量 x [ x 1 , x 2 , … , x n ] ⊤ \mathbf{x} [x_1, x…

自动驾驶3D目标检测综述(六)

停更了好久终于回来了(其实是因为博主去备考期末了hh) 这一篇接着(五)的第七章开始讲述第八章的内容。第八章主要介绍的是三维目标检测的高效标签。 目录 第八章 三维目标检测高效标签 一、域适应 (一)…

【HeadFirst系列之HeadFirst设计模式】第1天之HeadFirst设计模式开胃菜

HeadFirst设计模式开胃菜 前言 从今日起,陆续分享《HeadFirst设计模式》的读书笔记,希望能够帮助大家更好的理解设计模式,提高自己的编程能力。 今天要分享的是【HeadFirst设计模式开胃菜】,主要介绍了设计模式的基本概念、设计模…

qemu-kvm使用简介

qemu-kvm-note qemu-kvm使用简介 kvm和qemu关系 kvm是linux的一个模块,是工作在cpu硬件支持基础之上的虚拟化技术。加载该模块后,才能进一步通过其他工具创建虚拟机。仅有kvm模块,用户无法直接控制内核模块,必须有一个用户空间…

计算机网络——数据链路层-介质访问控制

一、介质访问控制方法 在局域网中, 介质访问控制(medium access control)简称MAC,也就是信道访问控制方法,可以 简单的把它理解为如何控制网络节点何时发送数据、如何传输数据以及怎样在介质上接收数据, 是解决当局域网中共用信道的使用产生竞…

论文研读:Text2Video-Zero 无需微调,仅改动<文生图模型>推理函数实现文生视频(Arxiv 2023-03-23)

论文名:Text2Video-Zero: Text-to-Image Diffusion Models are Zero-Shot Video Generators 1. 摘要 1.1 方法总结 通过潜空间插值, 实现动作连续帧。 以第一帧为锚定,替换原模型的self-attention,改为cross-attention 实现 保证图片整体场…

Python中的ast.literal_eval:安全地解析字符串为Python对象

Python中的ast.literal_eval:安全地解析字符串为Python对象 什么是ast.literal_eval?为什么说它是“安全”的? 如何使用ast.literal_eval?示例1:将字符串转换为列表示例2:将字符串转换为字典示例3&#xff…

适配器模式(类适配器,对象适配器)

1. 适配器模式简介 适配器模式用于解决接口不兼容问题,将一个类的接口转换成客户期望的另一个接口,使得原本由于接口不兼容而不能一起工作的类可以协同工作。适配器模式主要分为两类: 类适配器模式:通过继承适配者类来实现适配。…