DQN强化学习

news/2024/11/17 21:53:34/

算是自己写的第一个强化学习环境,目前还有很多纰漏,逐步改进ing。
希望能在两周内施工完成。


import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque
import matplotlib.pyplot as plt
import time
from tqdm import tqdm
import pandas as pddef moving_average(data, window_size):"""平滑函数:param data::param window_size::return:"""if window_size <= 0:raise ValueError("Window size should be greater than 0.")if window_size > len(data):raise ValueError("Window size should not be greater than the length of data.")# Cumulative sum of data elementscumsum = [0]for i, x in enumerate(data):cumsum.append(cumsum[i] + x)# Compute moving averagesma_values = []for i in range(len(data) - window_size + 1):average = (cumsum[i + window_size] - cumsum[i]) / window_sizema_values.append(average)return ma_valuesdef plot_data(data, title="Data Plot", x_label="X-axis", y_label="Y-axis"):"""画图:param data::param title::param x_label::param y_label::return:Plots a simple line graph based on the provided data.Parameters:- data (list): A list of integers or floats to be plotted.- title (str): The title of the plot.- x_label (str): The label for the x-axis.- y_label (str): The label for the y-axis."""plt.figure(figsize=(10, 5))  # Set the figure sizeplt.plot(data)  # Plot the dataplt.title(title)  # Set the titleplt.xlabel(x_label)  # Set x-axis labelplt.ylabel(y_label)  # Set y-axis labelplt.grid(True, which='both', linestyle='--', linewidth=0.5)  # Add a gridplt.tight_layout()  # Adjust subplot parameters to give specified paddingplt.show()class TransportMatchingEnv:def __init__(self, num_drivers=5, num_goods=5, max_price=10, max_time=5):""":param num_drivers: 货车数量:param num_goods: 货物数量:param max_price: 最大价格:param max_time: 最大时间"""self.num_drivers = num_driversself.num_goods = num_goodsself.max_price = max_priceself.max_time = max_time# 动作空间self.action_dim = self.num_drivers * self.num_goods * self.max_price * self.max_time# 当前协商状态 TODO: 状态,需要加很多东西self.current_negotiation = None# 状态self.combined_state = self.reset()# 距离矩阵,表示货与车之间的距离self.distance_matrix = np.random.randint(0, 100, (self.num_goods, self.num_drivers))# 货主期望抵达时间self.goods_time_preferences = np.random.randint(0, self.max_time, self.num_goods)# 货主期望价格self.goods_expected_prices = np.random.randint(0, self.max_price, self.num_goods)# 车主是否空闲self.driver_availabilities = np.random.choice([0, 1], self.num_drivers)# 货物是否有特殊需求self.goods_special_requirements = np.random.choice([0, 1], self.num_goods)# 车主是否有接受特殊货物的能力self.driver_special_capabilities = np.random.choice([0, 1])def decode_action(self, encoded_action):"""将action解码为人类可以读懂的形式:param encoded_action::return:"""total_actions_for_price_time = self.max_price * self.max_timetotal_actions_per_good = self.num_drivers * total_actions_for_price_timetotal_actions = self.num_goods * total_actions_per_goodif encoded_action >= total_actions:raise ValueError("Encoded action is out of bounds!")good_index = encoded_action // total_actions_per_goodresidual = encoded_action % total_actions_per_gooddriver_index = residual // total_actions_for_price_timeresidual = residual % total_actions_for_price_timeprice = residual // self.max_timetime = residual % self.max_timereturn driver_index, good_index, price, timedef compute_reward(self, driver_index, good_index, price, time):"""计算reward,:param driver_index::param good_index::param price::param time::return:"""# 1. Distance factor (assuming you have a distance matrix or function to compute distance)# distance_matrix = ... # a matrix containing distances between goods and driversdistance = self.distance_matrix[good_index][driver_index]distance_factor = -distance  # negative reward for longer distances# 2. Time factordelivery_time_preference = self.goods_time_preferences[good_index]  # assuming you have this datatime_penalty = -abs(delivery_time_preference - time) * 2  # penalize based on how far from preferred time# 3. Price factorexpected_price = self.goods_expected_prices[good_index]  # assuming you have this dataprice_difference = price - expected_priceprice_factor = -abs(price_difference)  # prefer prices close to expected# 4. Availability of the driver (assuming you have this data)driver_availability = self.driver_availabilities[driver_index]  # e.g., 0 for not available, 1 for availableavailability_factor = driver_availability * 10  # give a bonus for available drivers# 5. Special requirements (assuming you have this data)good_requirement = self.goods_special_requirements[good_index]  # e.g., 0 for no requirement, 1 for special storagedriver_capability = self.driver_special_capabilities[driver_index]  # e.g., 0 for no capability, 1 for special storagerequirement_factor = 0if good_requirement > 0 and driver_capability < good_requirement:requirement_factor = -20  # huge penalty if driver can't meet the special requirementtotal_reward = distance_factor + time_penalty + price_factor + availability_factor + requirement_factorreturn total_rewarddef reset(self):"""重置环境:return:"""random.seed(0)self.current_negotiation = np.zeros((self.num_goods, self.num_drivers))# Refresh all the parameters every time you reset the environmentself.distance_matrix = np.random.randint(0, 100, (self.num_goods, self.num_drivers))self.goods_time_preferences = np.random.randint(0, self.max_time, self.num_goods)self.goods_expected_prices = np.random.randint(0, self.max_price, self.num_goods)self.driver_availabilities = np.random.choice([0, 1], self.num_drivers)self.goods_special_requirements = np.random.choice([0, 1], self.num_goods)self.driver_special_capabilities = np.random.choice([0, 1], self.num_drivers)# print(f'self.distance_matrix:{self.distance_matrix}')# print(f'goods_time_preferences:{self.goods_time_preferences}')# print(f'goods_expected_prices:{self.goods_expected_prices}')# print(f'driver_availabilities:{self.driver_availabilities}')# print(f'goods_special_requirements:{self.goods_special_requirements}')# print(f'driver_special_capabilities:{self.driver_special_capabilities}')# self.distance_matrix = np.array([[67, 53, 24, 68, 92, 64, 85, 6, 77, 43],#                                  [40, 78, 48, 31, 14, 6, 7, 37, 26, 67],#                                  [96, 43, 73, 2, 71, 74, 37, 87, 17, 64],#                                  [28, 25, 84, 62, 51, 28, 32, 58, 98, 72],#                                  [13, 52, 38, 44, 11, 49, 11, 56, 80, 25],#                                  [3, 68, 25, 65, 50, 64, 2, 22, 40, 46],#                                  [98, 1, 9, 45, 80, 51, 86, 65, 22, 50],#                                  [98, 6, 73, 22, 12, 58, 84, 13, 38, 79],#                                  [78, 48, 52, 21, 36, 92, 71, 1, 22, 33],#                                  [43, 76, 74, 89, 19, 51, 34, 63, 11, 99]])# self.goods_time_preferences = [1, 1, 3, 4, 1, 1, 1, 3, 0, 4]# self.goods_expected_prices = [3, 4, 7, 1, 2, 2, 7, 5, 8, 2]# self.driver_availabilities = [1, 1, 0, 1, 0, 0, 1, 1, 0, 0]# self.goods_special_requirements = [0, 1, 0, 0, 1, 1, 1, 1, 0, 0]# self.driver_special_capabilities = [1, 1, 0, 0, 0, 1, 0, 0, 1, 1]# Combine everything into a single flattened statecombined_state = np.concatenate((self.current_negotiation.flatten(),self.distance_matrix.flatten(),self.goods_time_preferences,self.goods_expected_prices,self.driver_availabilities,self.goods_special_requirements,self.driver_special_capabilities))# print(f'combined_state.shape:{combined_state.shape}')return combined_statedef driver_satisfaction(self, fee_received, expected_fee, distance_travelled, max_distance, wait_time,max_wait_time,goods_condition):"""为车主设计的满意度计算:param fee_received: 收到的费用:param expected_fee: 预期费用:param distance_travelled: 行驶距离:param max_distance: 最大距离:param wait_time: 等待时间:param max_wait_time: 最大等待时间:param goods_condition: 货物状况:return:"""# 价格满意度price_satisfaction = (fee_received / expected_fee) * 40  # assuming max weightage of 40 for price# 距离满意度distance_satisfaction = ((max_distance - distance_travelled) / max_distance) * 30  # assuming max weightage of 30 for distance# 等待时间满意度wait_satisfaction = ((max_wait_time - wait_time) / max_wait_time) * 20  # assuming max weightage of 20 for wait time# 货物状况满意度goods_satisfaction = 10 if goods_condition == 'good' else 0  # assuming max weightage of 10 for goods condition# 总满意度total_satisfaction = price_satisfaction + distance_satisfaction + wait_satisfaction + goods_satisfactionreturn total_satisfactiondef shipper_satisfaction(self, fee_paid, expected_fee, delivery_time, expected_delivery_time, goods_condition,driver_service_quality):"""为货主设计的满意度计算:param fee_paid: 已付费用:param expected_fee: 预期费用:param delivery_time: 运输时间:param expected_delivery_time: 期望运输时间:param goods_condition: 货物状况:param driver_service_quality: 司机服务质量:return:"""# 价格满意度price_satisfaction = (expected_fee / fee_paid) * 30  # assuming max weightage of 30 for price# 时间满意度time_satisfaction = ((expected_delivery_time - delivery_time) / expected_delivery_time) * 30  # assuming max weightage of 30 for delivery time# 货物状况满意度goods_satisfaction = 20 if goods_condition == 'good' else 0# 服务满意度service_satisfaction = driver_service_quality * 20 / 100# 总满意度total_satisfaction = price_satisfaction + time_satisfaction + goods_satisfaction + service_satisfactionreturn total_satisfactiondef successOrFailure(self):# 判断是否协商成功,根据双方满意度# True为协商成功,false为协商失败return 1def step(self, encoded_action):""" TODO核心逻辑部分首先,明确何时协商成功,何时协商失败:param encoded_action: 待被decode的action:return:"""driver_index, good_index, price, time = self.decode_action(encoded_action)# print(f'driver_index, good_index, price, time:{driver_index, good_index, price, time}')# if self.current_negotiation[good_index][driver_index] == 1 or price >= self.max_price and time >= self.max_time:#     # 如果已经被匹配#     reward = 0#     state = self.current_negotiation.flatten()#     done = np.sum(self.current_negotiation) == self.num_goods#     return state, reward, done, {}# self.shipper_satisfaction()# if self.successOrFailure() == 1:#     # 如果协商成功#     pass# elif self.successOrFailure() == 2:#     # 协商失败,进行报价与反报价#     pass# else:#     # 协商失败,直接结束#     passif price <= self.max_price and time <= self.max_time:self.current_negotiation[good_index][driver_index] = 1reward = self.compute_reward(driver_index, good_index, price, time)combined_state = np.concatenate((self.current_negotiation.flatten(),self.distance_matrix.flatten(),self.goods_time_preferences,self.goods_expected_prices,self.driver_availabilities,self.goods_special_requirements,self.driver_special_capabilities))done = np.sum(self.current_negotiation) == self.num_goods# print(f'reward, state, done:{reward, state, done}')return combined_state, reward, done, {}def render(self):print(self.current_negotiation)# Simple random agent for testing
class RandomAgent:def __init__(self, action_dim):self.action_dim = action_dimdef act(self):return np.random.choice(self.action_dim)class DQN(nn.Module):def __init__(self, input_dim, output_dim):# print(f'input_dim,output_dim:{input_dim, output_dim}')super(DQN, self).__init__()self.fc = nn.Sequential(nn.Linear(input_dim, 128),nn.ReLU(),nn.Linear(128, 128),nn.ReLU(),nn.Linear(128, output_dim))def forward(self, x):# print(f'x.shape:{x.shape}')return self.fc(x)class DQNAgent:def __init__(self, input_dim, action_dim, gamma=0.99, epsilon=0.99, lr=0.001):self.input_dim = input_dimself.action_dim = action_dimself.gamma = gammaself.epsilon = epsilonself.lr = lrself.network = DQN(input_dim, action_dim).float().to(device)self.target_network = DQN(input_dim, action_dim).float().to(device)self.target_network.load_state_dict(self.network.state_dict())self.optimizer = optim.Adam(self.network.parameters(), lr=self.lr)self.memory = deque(maxlen=2000)def act(self, state):if np.random.random() > self.epsilon:state = torch.tensor([state], dtype=torch.float32).to(device)with torch.no_grad():action = self.network(state).argmax().item()return actionelse:return np.random.choice(self.action_dim)def remember(self, state, action, reward, next_state, done):self.memory.append((state, action, reward, next_state, done))def train(self, batch_size=64):if len(self.memory) < batch_size:returnbatch = random.sample(self.memory, batch_size)# print(f'batch:{len(batch)}')states, actions, rewards, next_states, dones = zip(*batch)states = torch.tensor(states, dtype=torch.float32).to(device)actions = torch.tensor(actions, dtype=torch.int64).to(device)rewards = torch.tensor(rewards, dtype=torch.float32).to(device)next_states = torch.tensor(next_states, dtype=torch.float32).to(device)dones = torch.tensor(dones, dtype=torch.float32).to(device)current_values = self.network(states).gather(1, actions.unsqueeze(-1)).squeeze(-1)next_values = self.target_network(next_states).max(1)[0].detach()target_values = rewards + self.gamma * next_values * (1 - dones)loss = nn.MSELoss()(current_values, target_values)self.optimizer.zero_grad()loss.backward()self.optimizer.step()def update_target_network(self):self.target_network.load_state_dict(self.network.state_dict())def decrease_epsilon(self, decrement_value=0.001, min_epsilon=0.1):self.epsilon = max(self.epsilon - decrement_value, min_epsilon)if __name__ == '__main__':start = time.time()device = torch.device("cuda" if torch.cuda.is_available() else "cpu")rewards = []env = TransportMatchingEnv(num_drivers=10, num_goods=10)agent = DQNAgent(env.combined_state.flatten().shape[0], env.action_dim)# agent = DQNAgent(env, env.action_dim)# 运行次数episodes = 2000for episode in tqdm(range(episodes)):state = env.reset()done = Falseepisode_reward = 0total_reward = 0while not done:action = agent.act(state)next_state, reward, done, _ = env.step(action)agent.remember(state, action, reward, next_state, done)agent.train()episode_reward += rewardtotal_reward += rewardstate = next_state# print(f'done:{type(done)}')done = done.item()# if done is True:# print(f'state:{state}')agent.decrease_epsilon()rewards.append(total_reward)if episode % 50 == 0:agent.update_target_network()# print(f"Episode {episode + 1}/{episodes} - Reward: {episode_reward}")# 将数据df = pd.DataFrame(data=rewards)# 将DataFrame保存为excel文件df.to_excel('sample.xlsx', index=True)plot_data(moving_average(data=rewards, window_size=1), title='reward', x_label='epoch', y_label='reward')end = time.time()print(f'device: {device}')print(f'time: {end - start}')

http://www.ppmy.cn/news/1188890.html

相关文章

Flutter PopupMenuButton下拉菜单

下拉菜单是移动应用交互中一种常见的交互方式,可以使用下拉列表来展示多个内容标签,实现页面引导的作用。在Flutter开发中,实现下拉弹框主要有两种方式,一种是继承Dialog组件使用自定义布局的方式实现,另一种则是使用官方的PopupMenuButton组件进行实现。 如果没有特殊的…

基于jsp,ssm物流快递管理系统

开发工具&#xff1a;eclipse&#xff0c;jdk1.8 服务器&#xff1a;tomcat7.0 数据库&#xff1a;mysql5.7 技术&#xff1a; springspringMVCmybaitsEasyUI 项目包括用户前台和管理后台两部分&#xff0c;功能介绍如下&#xff1a; 一、用户(前台)功能&#xff1a; 用…

【CPP】类和对象

1- Classes and Objects Structures A struct in C is a type consisting of a sequence of data membersSome functions/Statements are needed to operate the data members of an object of a struct type 不不小心操作错误&#xff0c;不小心越界 Classes You should b…

huggingface离线模式及默认保存路径

T5Tokenizer.from_pretrained()函数会在线下载huggingface上的模型&#xff0c;并自动保存在C盘的.cache路径下&#xff1a; C:\Users\xxxxx\.cache\huggingface\hub\models--google--t5-v1_1-base # 这里xxxxx代表自己电脑用户名huggingface离线下载 以google/t5-v1_1-base为…

css矩形盒子实现虚线流动边框+css实现step连接箭头

由于项目里需要手写步骤条 且实现指定状态边框虚线流动效果&#xff0c;故使用css去绘制步骤条连接箭头和绘制边框流动效果 效果&#xff1a; 1.绘制步骤条连接箭头 <ul class"process-list"><div v-for"(process, index) in processes" :key&qu…

TypeScript -类 -类的基本定义

类的基本概念 类 就是对象的抽象&#xff0c;是对象的模板。 对象 就是类的一个具体实现。比如 【学生】是一个类&#xff0c;每个学生都有姓名、年龄两个属性&#xff0c;每个学生都有一个方法 sayHi()。【小明】是一个【学生】&#xff0c;也就是【学生】类的一个对象&#…

【C++ 系列文章 -- 程序员考试 下午场 C++ 专题 201711 】

文章目录 1.1 C 题目六1.1.1 填空&#xff08;1&#xff09;详解1.1.2 填空&#xff08;2&#xff09;详解1.1.2.1 C this 的使用 1.1.3 填空&#xff08;3&#xff09;详解1.1.4 填空&#xff08;4&#xff09;详解1.1.5 填空&#xff08;5&#xff09;详解1.1.6 填空&#xf…

精密数据工匠:探索 Netty ChannelHandler 的奥秘

通过上篇文章&#xff08;Netty入门 — Channel&#xff0c;把握 Netty 通信的命门&#xff09;&#xff0c;我们知道 Channel 是传输数据的通道&#xff0c;但是有了数据&#xff0c;也有数据通道&#xff0c;没有数据加工也是没有意义的&#xff0c;所以今天学习 Netty 的第四…