深度强化学习DDPG算法⾼性能Pytorch代码(改写⾃spinningup,低环境依
赖,。。。
写在前⾯
1. DRL各种算法在github上各处都是,例如莫凡的、
2. 很多代码不是原算法的最佳实现,在具体实现细节上也存在差异,不建议直接⽤在科研上。
3. 这篇博客的代码改写⾃OpenAi spinningup源码,代码性能⽅⾯不再是你需要考虑的问题了。
4. 为什么改写?因为源码依赖环境过多,新⼿读起来很吃⼒,还有很多logger让⼈头疼。
5. 这篇博客的代码将环境依赖降低到最⼩,并且摒弃了⼀些辅助功能,让代码更容易读懂。
6. 如果本博客的代码在迁移⾄你的环境时依旧收敛不了,你的reward或者思路存在问题。
项⽬分三个⽂件:main.py , DDPGModel.py , core.py
Python3.6
DDPGModel.py
import numpy as np
from copy import deepcopy
from torch.optim import Adam
import torch
import core as core
class ReplayBuffer:# 输⼊为size;obs的维度(3,):这⾥在内部对其解运算成3;action的维度3
"""
A simple FIFO experience replay buffer for DDPG agents.
"""
def__init__(lf, obs_dim, act_dim, size):
lf.obs_buf = np.bined_shape(size, obs_dim), dtype=np.float32)
lf.obs2_buf = np.bined_shape(size, obs_dim), dtype=np.float32)
lf.act_buf = np.bined_shape(size, act_dim), dtype=np.float32)
lf.done_buf = np.zeros(size, dtype=np.float32)
lf.ptr, lf.size, lf.max_size =0,0, size
def store(lf, obs, act, rew, next_obs, done):
lf.obs_buf[lf.ptr]= obs
lf.obs2_buf[lf.ptr]= next_obs
lf.act_buf[lf.ptr]= act
c语言数组赋值
lf.done_buf[lf.ptr]= done
lf.ptr =(lf.ptr+1)% lf.max_size
lf.size =min(lf.size+1, lf.max_size)
def sample_batch(lf, batch_size=32):
idxs = np.random.randint(0, lf.size, size=batch_size)
batch =dict(obs=lf.obs_buf[idxs],
obs2=lf.obs2_buf[idxs],
act=lf.act_buf[idxs],薰衣草紫
w_buf[idxs],
done=lf.done_buf[idxs])
return{k: torch.as_tensor(v, dtype=torch.float32)for k,v in batch.items()}
class DDPG:
def__init__(lf, obs_dim, act_dim, act_bound, actor_critic=core.MLPActorCritic, ed=0,
replay_size=int(1e6), gamma=0.99, polyak=0.995, pi_lr=1e-3, q_lr=1e-3, act_noi=0.1):
lf.obs_dim = obs_dim
lf.act_dim = act_dim
lf.act_dim = act_dim
lf.act_bound = act_bound
lf.gamma = gamma
lf.polyak = polyak
lf.act_noi = act_noi
torch.manual_ed(ed)
np.random.ed(ed)
lf.ac = actor_critic(obs_dim, act_dim, act_limit =2.0)
lf.ac_targ = deepcopy(lf.ac)
lf.pi_optimizer = Adam(lf.ac.pi.parameters(), lr=pi_lr)
lf.q_optimizer = Adam(lf.ac.q.parameters(), lr=q_lr)
for p in lf.ac_targ.parameters():
def compute_loss_q(lf, data):#返回(q⽹络loss, q⽹络输出的状态动作值即Q值)
琴声何来o, a, r, o2, d = data['obs'], data['act'], data['rew'], data['obs2'], data['done']
q = lf.ac.q(o,a)
# Bellman backup for Q function
_grad():
q_pi_targ = lf.ac_targ.q(o2, lf.ac_targ.pi(o2))
backup = r + lf.gamma *(1- d)* q_pi_targ
# MSE loss against Bellman backup
loss_q =((q - backup)**2).mean()
return loss_q # 这⾥的loss_q没加负号说明是最⼩化,很好理解,TD正是⽤函数逼近器去逼近backup,误差⾃然越⼩越好
def compute_loss_pi(lf, data):
o = data['obs']
q_pi = lf.ac.q(o, lf.ac.pi(o))
an()# 这⾥的负号表明是最⼤化q_pi,即最⼤化在当前state策略做出的action的Q值
def update(lf, data):
# First run one gradient descent step for Q.
lf._grad()
loss_q = lf.compute_loss_q(data)
loss_q.backward()
lf.q_optimizer.step()
# Freeze Q-network so you don't waste computational effort
# computing gradients for it during the policy learning step.
for p in lf.ac.q.parameters():
# Next run one gradient descent step for pi.
lf._grad()
loss_pi = lf.compute_loss_pi(data)
loss_pi.backward()
lf.pi_optimizer.step()
# Unfreeze Q-network so you can optimize it at next DDPG step.
for p in lf.ac.q.parameters():
# Finally, update target networks by polyak averaging.
_grad():
for p, p_targ in zip(lf.ac.parameters(), lf.ac_targ.parameters()):
for p, p_targ in zip(lf.ac.parameters(), lf.ac_targ.parameters()):
# NB: We u an in-place operations "mul_", "add_" to update target
鳄鱼的画法
搞笑的生日祝福语# params, as oppod to "mul" and "add", which would make new tensors. p_targ.data.mul_(lf.polyak)
p_targ.data.add_((1- lf.polyak)* p.data)
def get_action(lf, o, noi_scale):
a = lf.ac.act(torch.as_tensor(o, dtype=torch.float32))
a += noi_scale * np.random.randn(lf.act_dim)
return np.clip(a, lf.act_bound[0], lf.act_bound[1])
core.py
import numpy as np
import scipy.signal
import torch
as nn
def combined_shape(length, shape=None):#返回⼀个元祖(x,y)
if shape is None:
return(length,)
return(length, shape)if np.isscalar(shape)el(length,*shape)# ()可以理解为元组构造函数,*号将shape多余维度去除
def mlp(sizes, activation, output_activation=nn.Identity):
layers =[]
for j in range(len(sizes)-1):
act = activation if j <len(sizes)-2el output_activation
layers +=[nn.Linear(sizes[j], sizes[j+1]), act()]
return nn.Sequential(*layers)
def count_vars(module):
return sum([np.prod(p.shape)for p in module.parameters()])
class MLPActor(nn.Module):
def__init__(lf, obs_dim, act_dim, hidden_sizes, activation, act_limit):
super().__init__()
pi_sizes =[obs_dim]+list(hidden_sizes)+[act_dim]
lf.pi = mlp(pi_sizes, activation, nn.Tanh)
lf.act_limit = act_limit
def forward(lf, obs):
# Return output from network scaled to action space limits.
return lf.act_limit * lf.pi(obs)
class MLPQFunction(nn.Module):
def__init__(lf, obs_dim, act_dim, hidden_sizes, activation):
super().__init__()
lf.q = mlp([obs_dim + act_dim]+list(hidden_sizes)+[1], activation)
选调生工作总结
def forward(lf, obs, act):
q = lf.q(torch.cat([obs, act], dim=-1))
return torch.squeeze(q,-1)# Critical to ensure q has right shape.
class MLPActorCritic(nn.Module):
def__init__(lf, obs_dim, act_dim, hidden_sizes=(256,256),
activation=nn.ReLU, act_limit =2.0):
super().__init__()
# build policy and value functions
lf.pi = MLPActor(obs_dim, act_dim, hidden_sizes, activation, act_limit)
lf.q = MLPQFunction(obs_dim, act_dim, hidden_sizes, activation)
def act(lf, obs):
_grad():
return lf.pi(obs).numpy()
main.py
from DDPGModel import*
import gym
import matplotlib.pyplot as plt
if __name__ =='__main__':
env = gym.make('Pendulum-v0')
obs_dim = env.obrvation_space.shape[0]
act_dim = env.action_space.shape[0]
act_bound =[-env.action_space.high[0], env.action_space.high[0]]
ddpg = DDPG(obs_dim, act_dim, act_bound)
MAX_EPISODE =100
MAX_STEP =500
update_every =50
batch_size =100
rewardList =[]
for episode in range(MAX_EPISODE):
o = ()
ep_reward =0
for j in range(MAX_STEP):
if episode >20:
a = _action(o, ddpg.act_noi)
el:
a = env.action_space.sample()
足球球星o2, r, d, _ = env.step(a)
踏春日记if episode >=10and j % update_every ==0:
for _ in range(update_every):
batch = play_buffer.sample_batch(batch_size) ddpg.update(data=batch)
o = o2
ep_reward += r
if d:
break
print('Episode:', episode,'Reward:%i'%int(ep_reward))
rewardList.append(ep_reward)
plt.figure()
plt.plot(np.arange(len(rewardList)),rewardList)
plt.show()
对⽐