强化学习经典算法笔记(⼋):LSTM加持的A2C算法解决POMDP问题
强化学习经典算法笔记(⼋):LSTM加持的A2C算法解决POMDP问题
环境采⽤CartPole-v1。原本状态是⼀个4维向量,现删去第⼆维,即⼩车的速度,保留⼩车的位移,杆的⾓度和⾓速度,使问题从MDP问题变为POMDP(Partial Obrvable Markov Decision Process)问题。
代码如下:
导⼊必要package
import torch
as nn
6年级上册英语functional as F
from torch.autograd import Variable
import matplotlib.pyplot as plt
import numpy as np
import math
import random
import os
import gym
参数设置
STATE_DIM =4-1;# 删去⼩车速度这⼀维度,使之成为POMDP
妇科咨询网ACTION_DIM =2;# 动作空间⼤⼩
NUM_EPISODE =5000;# 训练的Episode数量
EPISODE_LEN =1000;# episode最⼤长度
A_HIDDEN =40;# Actor⽹络的隐层神经元数量
C_HIDDEN =40;# Critic⽹络的隐层神经元数量
ActorCritic⽹络
# ActorNet使⽤LSTM + MLP估计完整的状态
class ActorNetwork(nn.Module):
def__init__(lf,in_size,hidden_size,out_size):
super(ActorNetwork, lf).__init__()
lf.lstm = nn.LSTM(in_size, hidden_size, batch_first =True)
lf.fc = nn.Linear(hidden_size,out_size)
def forward(lf, x, hidden):
x, hidden = lf.lstm(x, hidden)
x = lf.fc(x)
x = F.log_softmax(x,2)# log(softmax(x))
return x, hidden
class ValueNetwork(nn.Module):
汽车保险怎么买def__init__(lf,in_size,hidden_size,out_size):
super(ValueNetwork, lf).__init__()
lf.lstm = nn.LSTM(in_size, hidden_size, batch_first =True)
lf.fc = nn.Linear(hidden_size,out_size)
def forward(lf,x, hidden):
x, hidden = lf.lstm(x, hidden)
x = lf.fc(x)
return x, hidden
完成单episode交互并记录trajectory
def roll_out(actor_network,env,episode_len,value_network,init_state):
'''
rollout最长1000frames
返回:
状态序列,不包括终态
动作序列,独热编码
奖励序列,不包括终态奖励
state:游戏环境初始化后的初始状态
'''
states =[]
actions =[]
rewards =[]
is_done =Fal
final_r =0
state = init_state # 初始状态
a_hx = s(A_HIDDEN).unsqueeze(0).unsqueeze(0);# 初始化隐状态
a_cx = s(A_HIDDEN).unsqueeze(0).unsqueeze(0);
c_hx = s(C_HIDDEN).unsqueeze(0).unsqueeze(0);
c_cx = s(C_HIDDEN).unsqueeze(0).unsqueeze(0);
for j in range(episode_len):
states.append(state)
log_softmax_action,(a_hx,a_cx)= actor_network(Variable(torch.Tensor([state]).unsqueeze(0)),(a_hx,a_cx)) # 这个部分可以⽤torch Categorical来实现
# from torch.distributions import Categorical
softmax_action = p(log_softmax_action)# 对数softmax取指数,保证⼤于0
action = np.random.choice(ACTION_DIM,p=softmax_action.cpu().data.numpy()[0][0])
# 动作独热编码
one_hot_action =[int(k == action)for k in range(ACTION_DIM)]
next_state,reward,done,_ = env.step(action)
next_state = np.delete(next_state,1)
#fix_reward = -10 if done el 1
actions.append(one_hot_action)
rewards.append(reward)
final_state = next_state # final_state和state是⼀回事
state = next_state
if done:
is_done =True
state = ()
state = np.delete(state,1)
a_hx = s(A_HIDDEN).unsqueeze(0).unsqueeze(0);
a_cx = s(A_HIDDEN).unsqueeze(0).unsqueeze(0);
c_hx = s(C_HIDDEN).unsqueeze(0).unsqueeze(0);
c_cx = s(C_HIDDEN).unsqueeze(0).unsqueeze(0);
# 打印episode总分
print(j+1)
break
if not is_done:# 1000frame后如果episode还未结束,就⽤VNet估计终态价值c_out
c_out,(c_hx,c_cx)= value_network(Variable(torch.Tensor([final_state])),(c_hx,c_cx))
final_r = c_out.cpu().data.numpy()# 如果episode正常结束,final_r=0表⽰终态cart失去控制得0分
陈式太极拳小架return states,actions,rewards,final_r,state
计算累计折扣奖励的函数
def discount_reward(r, gamma,final_r):
'''
r: list
final_r: scalar
'''
discounted_r = np.zeros_like(r)
running_add = final_r
for t in reverd(range(0,len(r))):
running_add = running_add * gamma + r[t]
discounted_r[t]= running_add
return discounted_r
训练和测试主函数
def main():
# 初始化env
env = gym.make("CartPole-v1")
init_state = ()
init_state = np.delete(init_state,1)# 删掉cart velocity这⼀维度
# 初始化价值⽹络
value_network = ValueNetwork(in_size=STATE_DIM, hidden_size=C_HIDDEN, out_size=1)
value_network_optim = torch.optim.Adam(value_network.parameters(),lr=0.005)
# 初始化动作⽹络
actor_network = ActorNetwork(in_size=STATE_DIM, hidden_size=A_HIDDEN, out_size=ACTION_DIM)
actor_network_optim = torch.optim.Adam(actor_network.parameters(),lr =0.001)
steps =[]
task_episodes =[]
test_results =[]
for episode in range(NUM_EPISODE):
# 完成⼀轮rollout
states,actions,rewards,final_r,current_state = roll_out(actor_network,env,EPISODE_LEN,value_network,init_state) # states.shape = [epi_len,3],list
# rollout结束后的初态
init_state = current_state
actions_var = Variable(torch.Tensor(actions).view(-1,ACTION_DIM)).unsqueeze(0)
states_var = Variable(torch.Tensor(states).view(-1,STATE_DIM)).unsqueeze(0)
# 训练动作⽹络
a_hx = s(A_HIDDEN).unsqueeze(0).unsqueeze(0);
a_cx = s(A_HIDDEN).unsqueeze(0).unsqueeze(0);
c_hx = s(C_HIDDEN).unsqueeze(0).unsqueeze(0);
c_cx = s(C_HIDDEN).unsqueeze(0).unsqueeze(0);
actor__grad()
# print(states_var.unsqueeze(0).size())
log_softmax_actions,(a_hx,a_cx)= actor_network(states_var,(a_hx,a_cx))
vs,(c_hx,c_cx)= value_network(states_var,(c_hx,c_cx))# 给出状态价值估计
vs.detach()# 不参与求梯度
# 计算Q(s,a)和Advantage函数
qs = Variable(torch.Tensor(discount_reward(rewards,0.99,final_r)))
qs = qs.view(1,-1,1)
advantages = qs - vs
# print('adv,',advantages.shape)
# log_softmax_actions * actions_var是利⽤独热编码特性取出对应action的对数概率
actor_network_loss =- an(torch.sum(log_softmax_actions*actions_var,1)* advantages)
actor_network_loss.backward()
actor_network_optim.step()
# 训练价值⽹络科技名词
value__grad()
target_values = qs
a_hx = s(A_HIDDEN).unsqueeze(0).unsqueeze(0);
a_cx = s(A_HIDDEN).unsqueeze(0).unsqueeze(0);
c_hx = s(C_HIDDEN).unsqueeze(0).unsqueeze(0);
进来的英文
c_cx = s(C_HIDDEN).unsqueeze(0).unsqueeze(0);
values,(c_hx,c_cx)= value_network(states_var,(c_hx,c_cx))长内和美
criterion = nn.MSELoss()
value_network_loss = criterion(values,target_values)
value_network_loss.backward()
value_network_optim.step()
# Testing
if(episode +1)%50==0:
result =0
test_task = gym.make("CartPole-v1")
for test_epi in range(10):# 测试10个episode
state = ()
state = np.delete(state,1)
a_hx = s(A_HIDDEN).unsqueeze(0).unsqueeze(0);
a_cx = s(A_HIDDEN).unsqueeze(0).unsqueeze(0);
c_hx = s(C_HIDDEN).unsqueeze(0).unsqueeze(0);
c_cx = s(C_HIDDEN).unsqueeze(0).unsqueeze(0);
for test_step in range(500):# 每个episode最长500frame
log_softmax_actions,(a_hx,a_cx)= actor_network(Variable(torch.Tensor([state]).view(1,1,3)),(a_hx,a_cx)) softmax_action = p(log_softmax_actions)
#print(softmax_action.data)
action = np.argmax(softmax_action.data.numpy()[0])
next_state,reward,done,_ = test_task.step(action)
next_state = np.delete(next_state,1)
result += reward
state = next_state
if done:
break
print("episode:",episode+1,"test result:",result/10.0)
steps.append(episode+1)
test_results.append(result/10)
plt.plot(steps,test_results)
益肾丸
plt.savefig('training_score.png')
if __name__ =='__main__':
main()
实验结果如下: