一、概述
该代码主要实现了在一个简单的网格世界环境中,使用蒙特卡洛方法来求解最优值函数和最优策略。网格世界由一个固定大小的方格组成,智能体可以在其中执行特定的动作(上、下、左、右),并根据状态转移规则和奖励函数获得相应的奖励。蒙特卡洛方法通过采样多个情节(episode),根据每个情节中状态的回报来更新值函数,并通过策略改进逐步得到最优策略。
二、依赖库
numpy
:用于数值计算,包括数组操作、求均值等。matplotlib.pyplot
:用于数据可视化,绘制值函数和最优策略的图形。
三、代码详解
1. 环境参数定义
# 定义网格世界的大小
GRID_SIZE = 5
ACTIONS = ['up', 'down', 'left', 'right'] # 可能的动作
REWARD_END = 10 # 到达终点的奖励
REWARD_STEP = -1 # 每步的惩罚
GAMMA = 0.9 # 折扣因子# 定义网格世界的起点和终点
START = (0, 0)
END = (GRID_SIZE - 1, GRID_SIZE - 1)
GRID_SIZE
:表示网格世界的边长,这里设置为 5,即网格世界是一个 5x5 的方格。ACTIONS
:列出了智能体在网格世界中可以采取的四个动作,分别是向上、向下、向左、向右。REWARD_END
:当智能体到达终点时所获得的奖励值,设定为 10。REWARD_STEP
:智能体在每一步移动时所受到的惩罚值,为 - 1。GAMMA
:折扣因子,用于计算未来奖励的现值,取值为 0.9。START
:定义了智能体的起始状态,坐标为 (0, 0),即网格世界的左上角方格。END
:定义了目标状态,坐标为 (GRID_SIZE - 1, GRID_SIZE - 1),即网格世界的右下角方格。
2. 状态转移函数
def transition(state, action):"""根据当前状态和动作,返回下一个状态。:param state: 当前状态 (x, y):param action: 动作 ('up', 'down', 'left', 'right'):return: 下一个状态 (x, y)"""x, y = stateif action == 'up':x = max(x - 1, 0)elif action == 'down':x = min(x + 1, GRID_SIZE - 1)elif action == 'left':y = max(y - 1, 0)elif action == 'right':y = min(y + 1, GRID_SIZE - 1)return (x, y)
该函数根据输入的当前状态state
(一个坐标元组 (x, y))和动作action
,计算并返回下一个状态。通过对当前状态的坐标进行相应的增减操作来实现状态转移,同时确保坐标值不会超出网格世界的边界范围。
3. 奖励函数
def reward(state, next_state):"""根据当前状态和下一个状态,返回奖励。:param state: 当前状态 (x, y):param next_state: 下一个状态 (x, y):return: 奖励值"""if next_state == END:return REWARD_ENDelse:return REWARD_STEP
此函数根据当前状态state
和下一个状态next_state
,返回相应的奖励值。如果下一个状态是目标状态END
,则返回到达终点的奖励REWARD_END
;否则返回每步的惩罚值REWARD_STEP
。
4. 初始化值函数和策略
def initialize():"""初始化值函数和策略。:return: 值函数和策略"""states = [(x, y) for x in range(GRID_SIZE) for y in range(GRID_SIZE)]V = {state: 0 for state in states} # 值函数policy = {state: np.random.choice(ACTIONS) for state in states} # 随机策略policy[END] = None # 终点没有动作return V, policy
- 首先生成网格世界中的所有状态,存储在
states
列表中。 - 初始化值函数
V
,将每个状态的值初始化为 0。 - 初始化策略
policy
,为每个状态随机选择一个动作(从ACTIONS
中随机选取),但终点状态END
设置为None
,表示在终点没有动作可执行。 - 最后返回初始化的值函数
V
和策略policy
。
5. 采样一个情节(episode)
def generate_episode(policy):"""根据当前策略采样一个 episode。:param policy: 策略,字典形式,键为状态,值为动作:return: episode,包含状态、动作和奖励的列表"""episode = []state = STARTwhile state != END:action = policy[state]next_state = transition(state, action)r = reward(state, next_state)episode.append((state, action, r))state = next_statereturn episode
- 该函数根据输入的策略
policy
,从起始状态START
开始,不断执行动作并转移状态,直到到达终点状态END
。 - 在每次循环中,获取当前状态对应的动作
action
,计算下一个状态next_state
和奖励r
,并将当前状态、动作和奖励以元组的形式添加到episode
列表中。 - 最终返回一个包含状态、动作和奖励的列表
episode
,表示一个完整的情节。
6. 蒙特卡洛法:更新值函数和策略
def monte_carlo(V, policy, num_episodes=1000):"""使用蒙特卡洛法更新值函数和策略。:param V: 值函数,字典形式,键为状态,值为值函数值:param policy: 策略,字典形式,键为状态,值为动作:param num_episodes: 采样的 episode 数量:return: 更新后的值函数和策略"""returns = {state: [] for state in V.keys()} # 记录每个状态的回报for _ in range(num_episodes):episode = generate_episode(policy)G = 0 # 累积回报# 从后向前遍历 episodefor t in reversed(range(len(episode))):state, action, r = episode[t]G = r + GAMMA * G# 首次访问蒙特卡洛法if state not in [x[0] for x in episode[:t]]:returns[state].append(G)V[state] = np.mean(returns[state])# 策略改进best_action = Nonebest_value = -np.inffor a in ACTIONS:next_state = transition(state, a)value = reward(state, next_state) + GAMMA * V[next_state]if value > best_value:best_value = valuebest_action = apolicy[state] = best_actionreturn V, policy
- 参数说明:
V
:当前的值函数,是一个字典,键为状态,值为对应状态的值函数值。policy
:当前的策略,也是一个字典,键为状态,值为对应状态的动作。num_episodes
:要采样的情节数量,默认为 1000。
- 函数逻辑:
- 初始化
returns
字典,用于记录每个状态的回报列表。 - 循环采样
num_episodes
个情节:- 生成一个情节
episode
。 - 初始化累积回报
G
为 0,从后向前遍历情节中的每个步骤:- 计算当前步骤的回报
G
,通过即时奖励r
和折扣因子GAMMA
更新累积回报。 - 如果当前状态是首次访问(即之前在该情节中未出现过),将当前回报
G
添加到returns[state]
中,并更新值函数V[state]
为该状态所有回报的均值。 - 进行策略改进:遍历所有可能的动作
a
,计算采取该动作后的下一个状态的价值value
,选择使价值最大的动作作为当前状态的新策略policy[state]
。
- 计算当前步骤的回报
- 生成一个情节
- 最后返回更新后的值函数
V
和策略policy
。
- 初始化
7. 生成所有状态
states = [(x, y) for x in range(GRID_SIZE) for y in range(GRID_SIZE)]
使用列表推导式生成一个包含网格世界中所有状态的列表,每个状态由一个坐标元组(x, y)
表示。
8. 初始化值函数和策略
V, policy = initialize()
调用initialize
函数,初始化值函数V
和策略policy
。
9. 使用蒙特卡洛法更新值函数和策略
V, policy = monte_carlo(V, policy, num_episodes=1000)
调用monte_carlo
函数,传入初始化的值函数V
、策略policy
和情节数量 1000,对值函数和策略进行更新。
10. 可视化值函数
# 可视化值函数
grid_value = np.zeros((GRID_SIZE, GRID_SIZE))
for state in states:grid_value[state] = V[state]plt.imshow(grid_value, cmap='viridis')
plt.colorbar(label='Value')
plt.title('State Value Function (Monte Carlo)')
plt.show()
将更新后的值函数V
转换为一个二维数组grid_value
,其中每个元素对应网格世界中一个状态的值函数。使用matplotlib.pyplot
的imshow
函数绘制该二维数组的热图,并添加颜色条和标题,以可视化值函数。
11. 可视化最优策略
# 可视化最优策略
grid_policy = np.empty((GRID_SIZE, GRID_SIZE), dtype=object)
for state in states:grid_policy[state] = policy[state]fig, ax = plt.subplots()
ax.imshow(np.ones((GRID_SIZE, GRID_SIZE)), cmap='gray')
for state in states:if state == END:ax.text(state[1], state[0], 'END', ha='center', va='center')continueaction = policy[state]if action == 'up':ax.arrow(state[1], state[0], 0, -0.3, head_width=0.2, head_length=0.1, fc='r', ec='r')elif action == 'down':ax.arrow(state[1], state[0], 0, 0.3, head_width=0.2, head_length=0.1, fc='r', ec='r')elif action == 'left':ax.arrow(state[1], state[0], -0.3, 0, head_width=0.2, head_length=0.1, fc='r', ec='r')elif action == 'right':ax.arrow(state[1], state[0], 0.3, 0, head_width=0.2, head_length=0.1, fc='r', ec='r')
ax.set_title('Optimal Policy (Monte Carlo)')
plt.show()
将更新后的最优策略policy
转换为一个二维数组grid_policy
,其中每个元素对应网格世界中一个状态的最优动作。使用matplotlib.pyplot
绘制一个灰色背景的图形,并在每个状态上绘制箭头表示最优动作(如果是目标状态,则显示'END'
),以可视化最优策略。
四、注意事项
- 代码中的蒙特卡洛方法采用了首次访问蒙特卡洛法,这意味着每个情节中每个状态只在首次出现时用于更新值函数。
- 网格世界的定义和参数(如大小、奖励、折扣因子等)可以根据具体问题进行调整。
- 可视化部分使用了
matplotlib
库,确保该库已经正确安装并配置。如果在运行代码时遇到图形显示问题,可以检查matplotlib
的设置或尝试其他可视化工具。 - 蒙特卡洛方法的性能和收敛速度可能受到采样情节数量的影响,可根据实际情况调整
num_episodes
参数。
完整代码
python">import numpy as np
import matplotlib.pyplot as plt# 定义网格世界的大小
GRID_SIZE = 5
ACTIONS = ['up', 'down', 'left', 'right'] # 可能的动作
REWARD_END = 10 # 到达终点的奖励
REWARD_STEP = -1 # 每步的惩罚
GAMMA = 0.9 # 折扣因子# 定义网格世界的起点和终点
START = (0, 0)
END = (GRID_SIZE - 1, GRID_SIZE - 1)# 定义状态转移函数
def transition(state, action):"""根据当前状态和动作,返回下一个状态。:param state: 当前状态 (x, y):param action: 动作 ('up', 'down', 'left', 'right'):return: 下一个状态 (x, y)"""x, y = stateif action == 'up':x = max(x - 1, 0)elif action == 'down':x = min(x + 1, GRID_SIZE - 1)elif action == 'left':y = max(y - 1, 0)elif action == 'right':y = min(y + 1, GRID_SIZE - 1)return (x, y)# 定义奖励函数
def reward(state, next_state):"""根据当前状态和下一个状态,返回奖励。:param state: 当前状态 (x, y):param next_state: 下一个状态 (x, y):return: 奖励值"""if next_state == END:return REWARD_ENDelse:return REWARD_STEP# 初始化值函数和策略
def initialize():"""初始化值函数和策略。:return: 值函数和策略"""states = [(x, y) for x in range(GRID_SIZE) for y in range(GRID_SIZE)]V = {state: 0 for state in states} # 值函数policy = {state: np.random.choice(ACTIONS) for state in states} # 随机策略policy[END] = None # 终点没有动作return V, policy# 采样一个 episode
def generate_episode(policy):"""根据当前策略采样一个 episode。:param policy: 策略,字典形式,键为状态,值为动作:return: episode,包含状态、动作和奖励的列表"""episode = []state = STARTwhile state != END:action = policy[state]next_state = transition(state, action)r = reward(state, next_state)episode.append((state, action, r))state = next_statereturn episode# 蒙特卡洛法:更新值函数和策略
def monte_carlo(V, policy, num_episodes=1000):"""使用蒙特卡洛法更新值函数和策略。:param V: 值函数,字典形式,键为状态,值为值函数值:param policy: 策略,字典形式,键为状态,值为动作:param num_episodes: 采样的 episode 数量:return: 更新后的值函数和策略"""returns = {state: [] for state in V.keys()} # 记录每个状态的回报for _ in range(num_episodes):episode = generate_episode(policy)G = 0 # 累积回报# 从后向前遍历 episodefor t in reversed(range(len(episode))):state, action, r = episode[t]G = r + GAMMA * G# 首次访问蒙特卡洛法if state not in [x[0] for x in episode[:t]]:returns[state].append(G)V[state] = np.mean(returns[state])# 策略改进best_action = Nonebest_value = -np.inffor a in ACTIONS:next_state = transition(state, a)value = reward(state, next_state) + GAMMA * V[next_state]if value > best_value:best_value = valuebest_action = apolicy[state] = best_actionreturn V, policy# 生成所有状态
states = [(x, y) for x in range(GRID_SIZE) for y in range(GRID_SIZE)]# 初始化值函数和策略
V, policy = initialize()# 使用蒙特卡洛法更新值函数和策略
V, policy = monte_carlo(V, policy, num_episodes=1000)# 可视化值函数
grid_value = np.zeros((GRID_SIZE, GRID_SIZE))
for state in states:grid_value[state] = V[state]plt.imshow(grid_value, cmap='viridis')
plt.colorbar(label='Value')
plt.title('State Value Function (Monte Carlo)')
plt.show()# 可视化最优策略
grid_policy = np.empty((GRID_SIZE, GRID_SIZE), dtype=object)
for state in states:grid_policy[state] = policy[state]fig, ax = plt.subplots()
ax.imshow(np.ones((GRID_SIZE, GRID_SIZE)), cmap='gray')
for state in states:if state == END:ax.text(state[1], state[0], 'END', ha='center', va='center')continueaction = policy[state]if action == 'up':ax.arrow(state[1], state[0], 0, -0.3, head_width=0.2, head_length=0.1, fc='r', ec='r')elif action == 'down':ax.arrow(state[1], state[0], 0, 0.3, head_width=0.2, head_length=0.1, fc='r', ec='r')elif action == 'left':ax.arrow(state[1], state[0], -0.3, 0, head_width=0.2, head_length=0.1, fc='r', ec='r')elif action == 'right':ax.arrow(state[1], state[0], 0.3, 0, head_width=0.2, head_length=0.1, fc='r', ec='r')
ax.set_title('Optimal Policy (Monte Carlo)')
plt.show()