DDPG强化学习pytorch代码

更新时间:2023-07-04 18:14:14 阅读: 评论:0

DDPG强化学习pytorch代码DDPG强化学习pytorch代码
参照莫烦⼤神的强化学习教程tensorflow代码改写成了pytorch代码。
具体代码如下,也可以去我的上下载
'''
torch = 0.41
'''
import torch
as nn
functional as F
import numpy as np
import gym
import time
>>>>#  hyper parameters  >>>>
MAX_EPISODES =200
MAX_EP_STEPS =200
LR_A =0.001# learning rate for actor
LR_C =0.002# learning rate for critic
GAMMA =0.9# reward discount
TAU =0.01# soft replacement
MEMORY_CAPACITY =10000
BATCH_SIZE =32
TAU =0.01
RENDER =Fal
ENV_NAME ='Pendulum-v0'
>>>>>>#  DDPG  >>>>>>>#
class ANet(nn.Module):# ae(s)=a
def__init__(lf,s_dim,a_dim):
super(ANet,lf).__init__()
lf.fc1 = nn.Linear(s_dim,30)
lf.fc1.al_(0,0.1)# initialization
lf.out = nn.Linear(30,a_dim)
lf.out.al_(0,0.1)# initialization
def forward(lf,x):
x = lf.fc1(x)
x = F.relu(x)
x = lf.out(x)
x = F.tanh(x)
actions_value = x*2
return actions_value
class CNet(nn.Module):# ae(s)=a
def__init__(lf,s_dim,a_dim):
super(CNet,lf).__init__()
lf.fcs = nn.Linear(s_dim,30)
lf.fcs.al_(0,0.1)# initialization
lf.fca = nn.Linear(a_dim,30)
lf.fca.al_(0,0.1)# initialization
lf.out = nn.Linear(30,1)
lf.out.al_(0,0.1)# initialization
def forward(lf,s,a):
james brolinx = lf.fcs(s)
y = lf.fca(a)
net = F.relu(x+y)
actions_value = lf.out(net)
return actions_value
class DDPG(object):
英语字母表supportsdef__init__(lf, a_dim, s_dim, a_bound,):
lf.a_dim, lf.s_dim, lf.a_bound = a_dim, s_dim, a_bound,
< = np.zeros((MEMORY_CAPACITY, s_dim *2+ a_dim +1), dtype=np.float32)
lf.pointer =0
#lf.ss = tf.Session()
lf.Actor_eval = ANet(s_dim,a_dim)
lf.Actor_target = ANet(s_dim,a_dim)
lf.Critic_eval = CNet(s_dim,a_dim)
欧洲杯几年举行一次lf.Critic_target = CNet(s_dim,a_dim)
lf.atrain = torch.optim.Adam(lf.Actor_eval.parameters(),lr=LR_A)
lf.loss_td = nn.MSELoss()
def choo_action(lf, s):
s = torch.unsqueeze(torch.FloatTensor(s),0)
return lf.Actor_eval(s)[0].detach()# ae(s)
def learn(lf):
for x in lf.Actor_target.state_dict().keys():
eval('lf.Actor_target.'+ x +'.data.mul_((1-TAU))')
eval('lf.Actor_target.'+ x +'.data.add_(TAU*lf.Actor_eval.'+ x +'.data)')
for x in lf.Critic_target.state_dict().keys():
eval('lf.Critic_target.'+ x +'.data.mul_((1-TAU))')
eval('lf.Critic_target.'+ x +'.data.add_(TAU*lf.Critic_eval.'+ x +'.data)')
# soft target replacement
#lf.ss.run(lf.soft_replace)  # ⽤ae、ce更新at,ct
indices = np.random.choice(MEMORY_CAPACITY, size=BATCH_SIZE)
bt = lf.memory[indices,:]
bs = torch.FloatTensor(bt[:,:lf.s_dim])
ba = torch.FloatTensor(bt[:, lf.s_dim: lf.s_dim + lf.a_dim])
br = torch.FloatTensor(bt[:,-lf.s_dim -1:-lf.s_dim])
bs_ = torch.FloatTensor(bt[:,-lf.s_dim:])
a = lf.Actor_eval(bs)
q = lf.Critic_eval(bs,a)# loss=-q=-ce(s,ae(s))更新ae  ae(s)=a  ae(s_)=a_
# 如果 a是⼀个正确的⾏为的话,那么它的Q应该更贴近0
loss_a =-an(q)
#print(q)
#print(loss_a)
_grad()
loss_a.backward()
lf.atrain.step()
a_ = lf.Actor_target(bs_)# 这个⽹络不及时更新参数, ⽤于预测 Critic 的 Q_target 中的 action
q_ = lf.Critic_target(bs_,a_)# 这个⽹络不及时更新参数, ⽤于给出 Actor 更新参数时的 Gradient ascent 强度
q_target = br+GAMMA*q_  # q_target = 负的
#print(q_target)
q_v = lf.Critic_eval(bs,ba)
#print(q_v)
td_error = lf.loss_td(q_target,q_v)
# td_error=R + GAMMA * ct(bs_,at(bs_))-ce(s,ba) 更新ce ,但这个ae(s)是记忆中的ba,让ce得出的Q靠近Q_target,让评价更准确#print(td_error)
<_grad()
td_error.backward()
def store_transition(lf, s, a, r, s_):
transition = np.hstack((s, a,[r], s_))
index = lf.pointer % MEMORY_CAPACITY  # replace the old memory with new memory
<[index,:]= transition
<[index,:]= transition
lf.pointer +=1
>>>>>>#  training  >>>>>>>#
考研政治冲刺env = gym.make(ENV_NAME)
env = env.unwrapped
env.ed(1)
s_dim = env.obrvation_space.shape[0]成都信息工程学院录取查询
a_dim = env.action_space.shape[0]
a_bound = env.action_space.high
ddpg = DDPG(a_dim, s_dim, a_bound)
var =3# control exploration
t1 = time.time()
for i in range(MAX_EPISODES):
s = ()
ep_reward =0
for j in range(MAX_EP_STEPS):
if RENDER:
# Add exploration noi
a = ddpg.choo_action(s)
a = np.clip(al(a, var),-2,2)# add randomness to action lection for exploration        s_, r, done, info = env.step(a)
ddpg.store_transition(s, a, r /10, s_)法律英语
if ddpg.pointer > MEMORY_CAPACITY:
var *=.9995# decay the action randomness
ddpg.learn()
s = s_
ep_reward += r
if j == MAX_EP_STEPS-1:
print('Episode:', i,' Reward: %i'%int(ep_reward),'Explore: %.2f'% var,)
if ep_reward >-300:RENDER =True
break
print('Running time: ', time.time()- t1)
效果如图所⽰:
基于进化算法的强化学习:
"""
According to morvanzhou.github.io/tutorials/
required pytorch=0.41
"""
import numpy as np
import gym
import multiprocessing as mp
import time
import torch
as nn
functional as F
N_KID =10# half of the training population
N_GENERATION =5000# training step
LR =.05# learning rate
SIGMA =.05# mutation strength or step size
N_CORE = mp.cpu_count()-1
CONFIG =[
dict(game="CartPole-v0",
四月英文缩写
n_feature=4, n_action=2, continuous_a=[Fal], ep_max_step=700, eval_threshold=500), dict(game="MountainCar-v0",
n_feature=2, n_action=3, continuous_a=[Fal], ep_max_step=200, eval_threshold=-120), dict(game="Pendulum-v0",
宝贝鱼在线翻译n_feature=3, n_action=1, continuous_a=[True,2.], ep_max_step=200, eval_threshold=-180) ][0]# choo your game
class net(nn.Module):
def__init__(lf,input_dim,output_dim):
super(net,lf).__init__()
lf.fc1 = nn.Linear(input_dim,30)
lf.fc1.al_(0,1)
lf.fc2 = nn.Linear(30,20)
lf.fc2.al_(0,1)
lf.fc3 = nn.Linear(20,output_dim)
lf.fc3 = nn.Linear(20,output_dim)
lf.fc3.al_(0,1)
def forward(lf,x):
x = F.tanh(lf.fc1(x))
x = F.tanh(lf.fc2(x))
out = lf.fc3(x)
return out
def sign(k_id):return-1.if k_id %2==0el1.# mirrored sampling
class SGD(object):# optimizer with momentum
def__init__(lf, params, learning_rate, momentum=0.9):
lf.v = np.zeros(params).astype(np.float32)
lf.lr, lf.momentum = learning_rate, momentum
def get_gradients(lf, gradients):
lf.v = lf.momentum * lf.v +(1.- lf.momentum)* gradients
return lf.lr * lf.v
def get_reward(network_param, num_p,env, ep_max_step, continuous_a, ed_and_id=None,):
# perturb parameters using ed
if ed_and_id is not None:
ed, k_id = ed_and_id
# for layer in network.children():
#    np.random.ed(ed)
#    layer.weight.data += torch.FloatTensor(sign(k_id) * SIGMA * np.random.randn(layer.weight.shape[0],layer.weight.shape[1]))
#    np.random.ed(ed)
#    layer.bias.data += torch.FloatTensor(sign(k_id) * SIGMA * np.random.randn(layer.bias.shape[0]))
np.random.ed(ed)
params = torch.FloatTensor(sign(k_id)* SIGMA * np.random.randn(num_p))
Net = net(CONFIG['n_feature'],CONFIG['n_action'])
Net.load_state_dict(network_param)
for layer in Net.children():
layer.weight.data += params[:layer.weight.shape[0]*layer.weight.shape[1]].view(layer.weight.shape[0],layer.weight.shape[1])
layer.bias.data += params[layer.weight.shape[0]*layer.weight.shape[1]:layer.bias.shape[0]+layer.weight.shape[0]*layer.weight.shape[1]]            params = params[layer.bias.shape[0]+layer.weight.shape[0]*layer.weight.shape[1]:]
el:
Net = net(CONFIG['n_feature'], CONFIG['n_action'])
Net.load_state_dict(network_param)
# run episode
s = ()
ep_r =0.
for step in range(ep_max_step):
a = get_action(Net, s, continuous_a)# continuous_a 动作是否连续
s, r, done, _ = env.step(a)
# mountain car's reward can be tricky
中国好声音插曲
if env.spec._env_name =='MountainCar'and s[0]>-0.1: r =0.
ep_r += r
if done:break
return ep_r
def get_action(network, x, continuous_a):
x = torch.unsqueeze(torch.FloatTensor(x),0)
x = network.forward(x)
if not continuous_a[0]:return np.argmax(x.detach().numpy(), axis=1)[0]# for discrete action
el:return continuous_a[1]* np.tanh(x.detach().numpy())[0]# for continuous action
def train(network_param, num_p,optimizer, utility, pool):
# pass ed instead whole noi matrix to parallel will save your time
noi_ed = np.random.randint(0,2**32-1, size=N_KID, dtype=np.uint32).repeat(2)# mirrored sampling
# ⽣成⼀些镜像的噪点,每⼀个种群⼀个噪点ed

本文发布于:2023-07-04 18:14:14,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/78/1078365.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:学习   强化   噪点
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图