【强化学习】Q-Learning算法求解悬崖行走问题 + Python代码实战

news/2024/10/30 19:36:41/

文章目录

  • 一、Q-Learning算法简介
    • 1.1 更新公式
    • 1.2 预测策略
    • 1.3 详细资料
  • 二、Python代码实战
    • 2.1 运行前配置
    • 2.2 主要代码
    • 2.3 运行结果展示
    • 2.4 关于可视化寻路过程的设置


一、Q-Learning算法简介

下面仅对Q-Learning算法对简单介绍

Q学习是一种异策略(off-policy)算法。

异策略在学习的过程中,有两种不同的策略:目标策略(target policy)和行为策略(behavior policy)。

目标策略就是我们需要去学习的策略,相当于后方指挥的军师,它不需要直接与环境进行交互

行为策略是探索环境的策略,负责与环境交互,然后将采集的轨迹数据送给目标策略进行学习,而且为送给目标策略的数据中不需要 a t + 1 a_{t+1} at+1,而Sarsa是要有 a t + 1 a_{t+1} at+1的。

Q学习不会管我们下一步去往哪里探索,它只选取奖励最大的策略

1.1 更新公式

Q-Learning的更新公式

Q ( s t , a t ) ← Q ( s t , a t ) + α [ r t + 1 + γ max ⁡ a Q ( s t + 1 , a ) − Q ( s t , a t ) ] Q\left(s_t, a_t\right) \leftarrow Q\left(s_t, a_t\right)+\alpha\left[r_{t+1}+\gamma \max _a Q\left(s_{t+1}, a\right)-Q\left(s_t, a_t\right)\right] Q(st,at)Q(st,at)+α[rt+1+γamaxQ(st+1,a)Q(st,at)]

1.2 预测策略

Q-Learning算法采用 ε \varepsilon ε-贪心搜索的策略(和Sarsa算法一样)

1.3 详细资料

关于更加详细的Q-Learning算法的介绍,请看我之前发的博客:【EasyRL学习笔记】第三章 表格型方法(Q-Table、Sarsa、Q-Learning)

在学习Q-Learning算法前你最好能了解以下知识点:

  • 时序差分方法
  • ε \varepsilon ε-贪心搜索策略
  • Q-Table

二、Python代码实战

2.1 运行前配置

准备好一个RL_Utils.py文件,文件内容可以从我的一篇里博客获取:【RL工具类】强化学习常用函数工具类(Python代码)

这一步很重要,后面需要引入该RL_Utils.py文件

在这里插入图片描述

2.2 主要代码

import argparse
import datetime
import math
import os
import time
import turtle
from collections import defaultdict
import dill
import gym
# 这里需要改成自己的RL_Utils.py文件的路径
from Python.ReinforcementLearning.EasyRL.RL_Utils import *# 悬崖行走地图
class CliffWalkingWapper(gym.Wrapper):def __init__(self, env):gym.Wrapper.__init__(self, env)self.t = Noneself.unit = 50self.max_x = 12self.max_y = 4def draw_x_line(self, y, x0, x1, color='gray'):assert x1 > x0self.t.color(color)self.t.setheading(0)self.t.up()self.t.goto(x0, y)self.t.down()self.t.forward(x1 - x0)def draw_y_line(self, x, y0, y1, color='gray'):assert y1 > y0self.t.color(color)self.t.setheading(90)self.t.up()self.t.goto(x, y0)self.t.down()self.t.forward(y1 - y0)def draw_box(self, x, y, fillcolor='', line_color='gray'):self.t.up()self.t.goto(x * self.unit, y * self.unit)self.t.color(line_color)self.t.fillcolor(fillcolor)self.t.setheading(90)self.t.down()self.t.begin_fill()for i in range(4):self.t.forward(self.unit)self.t.right(90)self.t.end_fill()def move_player(self, x, y):self.t.up()self.t.setheading(90)self.t.fillcolor('red')self.t.goto((x + 0.5) * self.unit, (y + 0.5) * self.unit)def render(self):if self.t == None:self.t = turtle.Turtle()self.wn = turtle.Screen()self.wn.setup(self.unit * self.max_x + 100,self.unit * self.max_y + 100)self.wn.setworldcoordinates(0, 0, self.unit * self.max_x,self.unit * self.max_y)self.t.shape('circle')self.t.width(2)self.t.speed(0)self.t.color('gray')for _ in range(2):self.t.forward(self.max_x * self.unit)self.t.left(90)self.t.forward(self.max_y * self.unit)self.t.left(90)for i in range(1, self.max_y):self.draw_x_line(y=i * self.unit, x0=0, x1=self.max_x * self.unit)for i in range(1, self.max_x):self.draw_y_line(x=i * self.unit, y0=0, y1=self.max_y * self.unit)for i in range(1, self.max_x - 1):self.draw_box(i, 0, 'black')self.draw_box(self.max_x - 1, 0, 'yellow')self.t.shape('turtle')x_pos = self.s % self.max_xy_pos = self.max_y - 1 - int(self.s / self.max_x)self.move_player(x_pos, y_pos)# Q_Learning智能体对象
class Q_Learning:def __init__(self, arg_dict):# 采样次数self.sample_count = 0# 动作数self.n_actions = arg_dict['n_actions']# 学习率self.lr = arg_dict['lr']# 未来奖励衰减系数self.gamma = arg_dict['gamma']# 当前的epsilon值self.epsilon = arg_dict['epsilon_start']# 初始的epsilon值self.epsilon_start = arg_dict['epsilon_start']# 最后的epsilon值self.epsilon_end = arg_dict['epsilon_end']# epsilon衰变参数self.epsilon_decay = arg_dict['epsilon_decay']# 使用嵌套字典表示Q(s,a),这里首先将所有Q(s、a)设置为0self.Q_table = defaultdict(lambda: np.zeros(self.n_actions))# 训练过程: 用e-greedy policy获取行动def sample_action(self, state):# 采样数更新self.sample_count += 1# 计算当前epsilon值self.epsilon = self.epsilon_end + (self.epsilon_start - self.epsilon_end) * \math.exp(-1. * self.sample_count / self.epsilon_decay)# 根据均匀分布获取一个0-1的随机值,如果随机值大于当前epsilon,则按照最大Q值来选择动作,否则随机选择一个动作return np.argmax(self.Q_table[str(state)]) if np.random.uniform(0, 1) > self.epsilon else np.random.choice(self.n_actions)# 测试过程: 用最大Q值获取行动def predict_action(self, state):return np.argmax(self.Q_table[str(state)])# 更新Q表格def update(self, state, action, reward, next_state, done):# 计算Q估计Q_predict = self.Q_table[str(state)][action]# 计算Q现实if done:# 如果回合结束,则直接等于当前奖励Q_target = rewardelse:# 如果回合每结束,则按照Q_target = reward + self.gamma * np.max(self.Q_table[str(next_state)])# 根据Q估计和Q现实,差分地更新Q表格self.Q_table[str(state)][action] += self.lr * (Q_target - Q_predict)# 保存模型def save_model(self, path):# 如果路径不存在,就自动创建Path(path).mkdir(parents=True, exist_ok=True)torch.save(obj=self.Q_table,f=path + "checkpoint.pkl",pickle_module=dill)# 加载模型def load_model(self, path):self.Q_table = torch.load(f=path + 'checkpoint.pkl', pickle_module=dill)# 训练函数
def train(arg_dict, env, agent):# 开始计时startTime = time.time()print(f"环境名: {arg_dict['env_name']}, 算法名: {arg_dict['algo_name']}, Device: {arg_dict['device']}")print("开始训练智能体......")# 记录每个epoch的奖励rewards = []# 记录每个epoch的智能体到达终点用的步数steps = []for epoch in range(arg_dict['train_eps']):# 每个epoch的总奖励ep_reward = 0# 每个epoch的步数记录器ep_step = 0# 重置环境,并获取初始状态state = env.reset()while True:# 画图if arg_dict['train_render']:env.render()# 根据e-贪心策略获取动作action = agent.sample_action(state)# 执行动作,获得下一个状态、奖励和是否结束当前回合的标志,并更新环境next_state, reward, done, _ = env.step(action)# 智能体更新,根据当前状态和动作、下一个状态和奖励,改进Q函数agent.update(state, action, reward, next_state, done)# 更新当前状态为下一时刻状态state = next_state# 累加记录奖励ep_reward += reward# 步数+1ep_step += 1# 如果当前回合结束,则跳出循环if done:break# 记录奖励、步数信息rewards.append(ep_reward)steps.append(ep_step)# 每隔10次迭代就输出一次if (epoch + 1) % 10 == 0:print(f'Epoch: {epoch + 1}/{arg_dict["train_eps"]}, Reward: {ep_reward:.2f}, Steps:{ep_step}, Epislon: {agent.epsilon:.3f}')print("智能体训练结束 , 用时: " + str(time.time() - startTime) + " s")return {'epochs': range(len(rewards)), 'rewards': rewards, 'steps': steps}# 测试函数
def test(arg_dict, env, agent):startTime = time.time()print("开始测试智能体......")print(f"环境名: {arg_dict['env_name']}, 算法名: {arg_dict['algo_name']}, Device: {arg_dict['device']}")# 记录每个epoch的奖励rewards = []# 记录每个epoch的智能体到达终点用的步数steps = []for epoch in range(arg_dict['test_eps']):# 每个epoch的总奖励ep_reward = 0# 每个epoch的步数记录器ep_step = 0# 重置环境,并获取初始状态state = env.reset()while True:# 画图if arg_dict['test_render']:env.render()# 根据最大Q值选择动作action = agent.predict_action(state)# 执行动作,获得下一个状态、奖励和是否结束当前回合的标志,并更新环境next_state, reward, done, _ = env.step(action)# 更新当前状态为下一时刻状态state = next_state# 累加记录奖励ep_reward += reward# 步数+1ep_step += 1# 如果当前回合结束,则跳出循环if done:break# 记录奖励、步数信息rewards.append(ep_reward)steps.append(ep_step)# 输出测试信息print(f"Epochs: {epoch + 1}/{arg_dict['test_eps']}, Steps:{ep_step}, Reward: {ep_reward:.2f}")print("测试结束 , 用时: " + str(time.time() - startTime) + " s")return {'episodes': range(len(rewards)), 'rewards': rewards, 'steps': steps}# 创建环境和智能体
def create_env_agent(arg_dict):# 创建环境env = gym.make(arg_dict['env_name'])env = CliffWalkingWapper(env)# 设置随机种子all_seed(env, seed=arg_dict["seed"])# 获取状态数try:n_states = env.observation_space.nexcept AttributeError:n_states = env.observation_space.shape[0]# 获取动作数n_actions = env.action_space.nprint(f"状态数: {n_states}, 动作数: {n_actions}")# 将状态数和动作数加入算法参数字典arg_dict.update({"n_states": n_states, "n_actions": n_actions})# 实例化智能体对象agent = Q_Learning(arg_dict)# 返回环境,智能体return env, agentif __name__ == '__main__':# 防止报错 OMP: Error #15: Initializing libiomp5md.dll, but found libiomp5md.dll already initialized.os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"# 获取当前路径curr_path = os.path.dirname(os.path.abspath(__file__))# 获取当前时间curr_time = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M_%S")# 相关参数设置parser = argparse.ArgumentParser(description="hyper parameters")parser.add_argument('--algo_name', default='Q-learning', type=str, help="name of algorithm")parser.add_argument('--env_name', default='CliffWalking-v0', type=str, help="name of environment")parser.add_argument('--train_eps', default=400, type=int, help="episodes of training")parser.add_argument('--test_eps', default=20, type=int, help="episodes of testing")parser.add_argument('--gamma', default=0.90, type=float, help="discounted factor")parser.add_argument('--epsilon_start', default=0.95, type=float, help="initial value of epsilon")parser.add_argument('--epsilon_end', default=0.01, type=float, help="final value of epsilon")parser.add_argument('--epsilon_decay', default=300, type=int, help="decay rate of epsilon")parser.add_argument('--lr', default=0.1, type=float, help="learning rate")parser.add_argument('--device', default='cpu', type=str, help="cpu or cuda")parser.add_argument('--seed', default=520, type=int, help="seed")parser.add_argument('--show_fig', default=False, type=bool, help="if show figure or not")parser.add_argument('--save_fig', default=True, type=bool, help="if save figure or not")parser.add_argument('--train_render', default=False, type=bool,help="Whether to render the environment during training")parser.add_argument('--test_render', default=True, type=bool,help="Whether to render the environment during testing")args = parser.parse_args()default_args = {'result_path': f"{curr_path}/outputs/{args.env_name}/{curr_time}/results/",'model_path': f"{curr_path}/outputs/{args.env_name}/{curr_time}/models/",}# 将参数转化为字典 type(dict)arg_dict = {**vars(args), **default_args}print("算法参数字典:", arg_dict)# 创建环境和智能体env, agent = create_env_agent(arg_dict)# 传入算法参数、环境、智能体,然后开始训练res_dic = train(arg_dict, env, agent)print("算法返回结果字典:", res_dic)# 保存相关信息agent.save_model(path=arg_dict['model_path'])save_args(arg_dict, path=arg_dict['result_path'])save_results(res_dic, tag='train', path=arg_dict['result_path'])plot_rewards(res_dic['rewards'], arg_dict, path=arg_dict['result_path'], tag="train")# =================================================================================================# 创建新环境和智能体用来测试print("=" * 300)env, agent = create_env_agent(arg_dict)# 加载已保存的智能体agent.load_model(path=arg_dict['model_path'])res_dic = test(arg_dict, env, agent)save_results(res_dic, tag='test', path=arg_dict['result_path'])plot_rewards(res_dic['rewards'], arg_dict, path=arg_dict['result_path'], tag="test")

2.3 运行结果展示

由于有些输出太长,下面只展示部分输出

状态数: 48, 动作数: 4
环境名: CliffWalking-v0, 算法名: Q-learning, Device: cpu
开始训练智能体......
Epoch: 10/400, Reward: -182.00, Steps:182, Epislon: 0.010
Epoch: 20/400, Reward: -63.00, Steps:63, Epislon: 0.010
Epoch: 30/400, Reward: -48.00, Steps:48, Epislon: 0.010
Epoch: 40/400, Reward: -54.00, Steps:54, Epislon: 0.010
Epoch: 50/400, Reward: -51.00, Steps:51, Epislon: 0.010
Epoch: 60/400, Reward: -194.00, Steps:95, Epislon: 0.010
Epoch: 70/400, Reward: -46.00, Steps:46, Epislon: 0.010
Epoch: 80/400, Reward: -57.00, Steps:57, Epislon: 0.010
Epoch: 90/400, Reward: -37.00, Steps:37, Epislon: 0.010
Epoch: 100/400, Reward: -68.00, Steps:68, Epislon: 0.010
Epoch: 110/400, Reward: -33.00, Steps:33, Epislon: 0.010
Epoch: 120/400, Reward: -47.00, Steps:47, Epislon: 0.010
Epoch: 130/400, Reward: -23.00, Steps:23, Epislon: 0.010
Epoch: 140/400, Reward: -16.00, Steps:16, Epislon: 0.010
Epoch: 150/400, Reward: -53.00, Steps:53, Epislon: 0.010
Epoch: 160/400, Reward: -42.00, Steps:42, Epislon: 0.010
Epoch: 170/400, Reward: -15.00, Steps:15, Epislon: 0.010
Epoch: 180/400, Reward: -28.00, Steps:28, Epislon: 0.010
Epoch: 190/400, Reward: -22.00, Steps:22, Epislon: 0.010
Epoch: 200/400, Reward: -14.00, Steps:14, Epislon: 0.010
Epoch: 210/400, Reward: -26.00, Steps:26, Epislon: 0.010
Epoch: 220/400, Reward: -16.00, Steps:16, Epislon: 0.010
Epoch: 230/400, Reward: -28.00, Steps:28, Epislon: 0.010
Epoch: 240/400, Reward: -15.00, Steps:15, Epislon: 0.010
Epoch: 250/400, Reward: -16.00, Steps:16, Epislon: 0.010
Epoch: 260/400, Reward: -25.00, Steps:25, Epislon: 0.010
Epoch: 270/400, Reward: -18.00, Steps:18, Epislon: 0.010
Epoch: 280/400, Reward: -113.00, Steps:14, Epislon: 0.010
Epoch: 290/400, Reward: -14.00, Steps:14, Epislon: 0.010
Epoch: 300/400, Reward: -127.00, Steps:28, Epislon: 0.010
Epoch: 310/400, Reward: -13.00, Steps:13, Epislon: 0.010
Epoch: 320/400, Reward: -13.00, Steps:13, Epislon: 0.010
Epoch: 330/400, Reward: -13.00, Steps:13, Epislon: 0.010
Epoch: 340/400, Reward: -13.00, Steps:13, Epislon: 0.010
Epoch: 350/400, Reward: -13.00, Steps:13, Epislon: 0.010
Epoch: 360/400, Reward: -13.00, Steps:13, Epislon: 0.010
Epoch: 370/400, Reward: -13.00, Steps:13, Epislon: 0.010
Epoch: 380/400, Reward: -13.00, Steps:13, Epislon: 0.010
Epoch: 390/400, Reward: -13.00, Steps:13, Epislon: 0.010
Epoch: 400/400, Reward: -13.00, Steps:13, Epislon: 0.010
智能体训练结束 , 用时: 0.38664937019348145 s
============================================================================================================================================================================================================================================================================================================
状态数: 48, 动作数: 4
开始测试智能体......
环境名: CliffWalking-v0, 算法名: Q-learning, Device: cpu
Epochs: 1/20, Steps:13, Reward: -13.00
Epochs: 2/20, Steps:13, Reward: -13.00
Epochs: 3/20, Steps:13, Reward: -13.00
Epochs: 4/20, Steps:13, Reward: -13.00
Epochs: 5/20, Steps:13, Reward: -13.00
Epochs: 6/20, Steps:13, Reward: -13.00
Epochs: 7/20, Steps:13, Reward: -13.00
Epochs: 8/20, Steps:13, Reward: -13.00
Epochs: 9/20, Steps:13, Reward: -13.00
Epochs: 10/20, Steps:13, Reward: -13.00
Epochs: 11/20, Steps:13, Reward: -13.00
Epochs: 12/20, Steps:13, Reward: -13.00
Epochs: 13/20, Steps:13, Reward: -13.00
Epochs: 14/20, Steps:13, Reward: -13.00
Epochs: 15/20, Steps:13, Reward: -13.00
Epochs: 16/20, Steps:13, Reward: -13.00
Epochs: 17/20, Steps:13, Reward: -13.00
Epochs: 18/20, Steps:13, Reward: -13.00
Epochs: 19/20, Steps:13, Reward: -13.00
Epochs: 20/20, Steps:13, Reward: -13.00
测试结束 , 用时: 16.472306728363037 s

奖励曲线:

在这里插入图片描述

测试阶段的寻路过程可视化

在这里插入图片描述

可以看出Q-Learning算法相比Sarsa算法而言就比较激进了,Sarsa算法只敢离悬崖远一点走,而Q-Learning算法却敢贴着悬崖边走,说明了off-policy在探险方面更具优势,具有更强的探险能力。

2.4 关于可视化寻路过程的设置

如果你觉得寻路过程可视化比较耗时,你可以进行设置,取消可视化。
或者你想看看训练过程的可视化,也可以进行相关设置(当然我看过了,前期很无聊,智能体一直在乱走)

在这里插入图片描述


http://www.ppmy.cn/news/155003.html

相关文章

耳机灵敏度与阻抗

耳机灵敏度与阻抗 灵敏度与阻抗是一对互相矛盾的参数, 灵敏度越大,越能够用小功率发出大声音,但微小的干扰电流就会产生杂音。 阻抗相反,它让电流更难“推动”耳机发声,但却可以有效地避免干扰,让声音更纯…

入耳式无线蓝牙耳机哪款好?入耳式音质好的蓝牙耳机推荐

入耳式蓝牙耳机以设计优势,密闭性好,漏音少,音质保真度高,更容易体验到低音的冲击感,但是也因为设计,导致很多入耳式佩戴不是那么的舒服,不适合长时间佩戴。下面我给大家推荐四款比较好的入耳式…

哪款蓝牙耳机戴着舒服?佩戴舒适度高的四款蓝牙耳机推荐

众所周知,一款戴着舒服的蓝牙耳机不仅能提升我们的佩戴体验,还能减少耳机对耳朵造成的伤害。所以,一款佩戴舒适度高的蓝牙耳机应该成为我们挑选蓝牙耳机时的关键要素之一。那么,在众多的蓝牙耳机当中,哪款蓝牙耳机戴着…

什么耳机戴着舒服耳朵不疼?不塞耳道的骨传导耳机

佩戴传统的入耳式气导耳机可能会使得耳朵有些伤害,长时间用耳机会听力受损,达到不可逆的伤害,过大的音量也可能会伤害到鼓膜。骨导耳机利用骨传导技术受话,紧贴骨头,声波直接通过骨头传至听神经。因此可以开放双耳&…

什么耳机戴着舒服耳朵不疼?五款久戴不疼的骨传导耳机推荐

长时间佩戴蓝牙耳机耳朵会出现不适和胀痛,那是因为耳膜长时间处于工作状态进而疲劳并反应出不适,同时对耳膜造成一定程度的损伤。骨传导耳机作为久戴不痛并且保护耳朵的特性,被广大人群知晓,并开始逐渐从佩戴蓝牙耳机到替换成骨传…

空气传导和骨传导耳机哪个好?这两种耳机有什么区别?

对于一些长时间使用入耳式耳机导致耳道酸胀的人来说,耳机的选择就变得尤为重要。近年来,骨传导耳机就以“黑马”一样迅速占领耳机市场,骨传导耳机具有普通耳机的功能,并且还避开了蓝牙耳机的缺点,解决了长时间戴耳机&a…

挂耳式耳机品牌排行榜,几款佩戴舒适的挂耳式耳机推荐

随着人们对健康意识的逐步加深,人们越来越重视自身的健康,运动也成为了不少人每日必须进行的时间。而我在运动的时候喜欢听音乐,特别是在户外运动的时候,戴着耳机听音乐的同时还能够感受周围环境的美妙声音,而在耳机方…

蓝牙耳机气传导哪个牌子好,试试这几款不入耳的气传导耳机

每一个运动爱好者从来都不会单纯地跑步,总喜欢随着音乐旋律而动。作为夜跑爱好者,我在小区里跑步时,总是戴着耳机运动,晚饭后小区内到处是饭后走动、运动的人群,稍不注意可能就会撞到人,为了更好的识别身边…