>>>>># DDPG Class >>>>### # # # 建⽴DDPG主要的类 # # # #
class DDPG(nn.Module):
def__init__(lf,act_dim,obs_dim,a_bound):
super(DDPG,lf).__init__()
# 建⽴状态、动作的参数
lf.act_dim = act_dim
lf.obs_dim = obs_dim
lf.a_bound = a_bound
# 记忆库的参数,当超过记忆库总量时开始训练
lf.pointer =0
# 建⽴四个⽹络
lf.actor_eval = ActorNet(obs_dim, act_dim)
lf.actor_target = ActorNet(obs_dim, act_dim)
# 建⽴经验回放库
< = np.zeros((MEMORY_CAPACITY, s_dim *2+ a_dim +1), dtype=np.float32) # 建⽴⽹络优化器和loss有害垃圾桶
lf.actor_optimizer = torch.optim.Adam(lf.actor_eval.parameters(), lr=LR_ACTOR)
lf.loss_func = nn.MSELoss()
# 动作选择的函数
def choo_action(lf,obs):
obs = torch.unsqueeze(torch.FloatTensor(obs),0)
action = lf.actor_eval(obs)[0].detach()
return action
# 经验回放的函数
def store_transition(lf,obs,action,reward,next_obs):
transition = np.hstack((obs, action,[reward], next_obs))
index = lf.pointer % MEMORY_CAPACITY # replace the old data with new data
<[index,:]= transition
lf.pointer +=1
# 学习的函数
def learn(lf):
# target ⽹络的软更新
for x in lf.actor_target.state_dict().keys():# state_dict,它包含了优化器的状态以及被使⽤的超参数
for x in lf.actor_target.state_dict().keys():# state_dict,它包含了优化器的状态以及被使⽤的超参数eval('lf.actor_target.'+ x +'.data.mul_((1-TAU))')
eval('lf.actor_target.'+ x +'.data.add_(TAU*lf.actor_eval.'+ x +'.data)')
for x itic_target.state_dict().keys():
eval('lf.critic_target.'+ x +'.data.mul_((1-TAU))')
eval('lf.critic_target.'+ x +'.data.add_(itic_eval.'+ x +'.data)')
# 从经验池中取⼀个batch的数据
indices = np.random.choice(MEMORY_CAPACITY, size=BATCH_SIZE)
batch_trans = lf.memory[indices,:]
# extract data from mini-batch of transitions including s, a, r, s_
batch_s = torch.FloatTensor(batch_trans[:,:lf.obs_dim])
batch_a = torch.FloatTensor(batch_trans[:, lf.obs_dim:lf.obs_dim + lf.act_dim])
batch_r = torch.FloatTensor(batch_trans[:,-lf.obs_dim -1:-lf.obs_dim])
batch_s_ = torch.FloatTensor(batch_trans[:,-lf.obs_dim:])
# Actor策略⽹络的更新
action = lf.actor_eval(batch_s)
Q = lf.critic_eval(batch_s,action)
actor_loss =-an(Q)
lf._grad()
actor_loss.backward()
lf.actor_optimizer.step()
# Critic评价⽹络的更新
act_target = lf.actor_target(batch_s_)
q_tmp = lf.critic_target(batch_s_,act_target)
Q_target = batch_r + GAMMA * q_tmp
Q_eval = lf.critic_eval(batch_s,batch_a)
td_error = lf.loss_func(Q_eval,Q_target)
<_grad()
td_error.backward()
>>>>># Training >>>>##
# # # # 主函数 # # # #
def main():
var =3
agent = DDPG(a_dim, s_dim, a_bound)# 定义DDPG的类
for episode in range(EPISODES):
obs = ()
for step in range(STEPS):
action = agent.choo_action(obs)# 选择动作
action = np.clip(al(action, var), a_low_bound, a_bound)
next_obs,reward,done,_ = env.step(action)
agent.store_transition(obs,action,reward,next_obs)# 存⼊经验回放池
if agent.pointer > MEMORY_CAPACITY:# 当超过经验回放池的容量时
var *=0.9995# 减少探索的⽐例
agent.learn()
obs = next_obs
if done:
break
# 每20个episode进⾏测试
if episode %20==0:
total_reward =0
清香造句
for i in range(TEST):# 每次测试取5个episode的平均
obs = ()
for j in range(STEPS):
action = agent.choo_action(obs)
action = np.clip(al(action, var), a_low_bound, a_bound)
action = np.clip(al(action, var), a_low_bound, a_bound)
next_obs,reward,done,_ = env.step(action)
obs = next_obs
total_reward += reward
if done:
break
avg_reward = total_reward / TEST # 计算测试的平均reward
print('Episode: ',episode,'Test_reward: ',avg_reward)
if __name__ =='__main__':
眼睛下面的痣
main()
3. DDPG算法的效果展⽰
这⾥使⽤的是gym中的 **‘Pendulum-v0’**环境,它是⼀个基于连续动作的环境,输出的动作是[-2,2]的浮点数,⽬标是保持杆⼦零⾓度(垂直),旋转速度最⼩,⼒度最⼩。
两三成语在训练了200个episode后,杆⼦⽴起来的时间越来越长,DDPG算法具有效果。