再谈《强化学习算法之DQN算法中的经验池的实现》experience_replay_buf。。。

更新时间:2023-06-15 06:31:22 阅读: 评论:0

再谈《强化学习算法之DQN算法中的经验池的实
现》experience_replay_buf。。。
去年曾写过⼀篇DQN经验池模块编写的博⽂:
最近⼜看到了⼀个经验池实现的代码,把这两个实现做了⼀下对⽐:
memory.py  新的经验池实现代码:
import numpy as np
Transition_dtype = np.dtype([
('timestep', np.int32),
('state', np.uint8, (84, 84)),
喜洋洋之快乐加油
('action', np.uint8),
('reward', np.float32),
('nonterminal', np.bool_)])
blank_trans = (
0,
0,
0.0,
Fal)
class ArrayMemory():
def__init__(lf, size):
lf.index = 0
lf.size = size
lf.full = Fal  # Ud to track actual capacity
# Build structured array
lf.data = np.array(
[blank_trans] * size,
dtype=Transition_dtype)
def append(lf, sarst_data):
# Store data in underlying data structure
lf.data[lf.index] = sarst_data
lf.index = (lf.index + 1) % lf.size  # Update index
# Save when capacity reached
if lf.index == 0:
lf.full = True
# Returns data given a data index
def get(lf, data_index):
return lf.data[data_index % lf.size]
def total(lf):
if lf.full:
return lf.size
el:
return lf.index
class Replay():
def__init__(lf, args):
lf.t = 0  # Internal episode timestep counter
lf.n = 1  # td(0)
lf.history_length = args.history_length
lf.discount = args.discount
lf.capacity = _capacity
""" Adds state and action at time t, reward and terminal at time t + 1 """
# Only store last frame and discreti to save memory
lf.t = 0  # Start new episodes with t = 0
el:
lf.t += 1
# Returns the transitions with blank states where appropriate
def _get_transitions(lf, idxs):
transition_idxs = np.arange(-lf.history_length + 1, lf.n + 1) \
+ np.expand_dims(idxs, axis=1)
transitions = (transition_idxs)
transitions_firsts = transitions['timestep'] == 0
blank_mask = np.zeros_like(
transitions_firsts,
dtype=np.bool_)
gotta be somebodyfor t in range(lf.history_length - 2, -1, -1):  # e.g. 2 1 0temple
# True if future frame has timestep 0
blank_mask[:, t] = np.logical_or(
blank_mask[:, t + 1], transitions_firsts[:, t + 1])
for t in range(lf.history_length, lf.history_length + lf.n):  # e.g. 4 5 6
# True if current or past frame has timestep 0
blank_mask[:, t] = np.logical_or(
blank_mask[:, t - 1], transitions_firsts[:, t])
pour是什么意思transitions[blank_mask] = blank_trans
return transitions
# Returns a batch of valid samples
def _get_samples(lf, batch_size, n_total):
idxs = []
iphoneswhile len(idxs) < batch_size:
idx = np.random.randint(0, n_total - 1)  # Uniformly sample
if (lf.transitions.index - idx) % lf.capacity >= lf.n and \
(idx - lf.transitions.index) % lf.capacity >= lf.history_length - 1:                idxs.append(idx)
# Retrieve all required transition data (from t - h to t + n)
transitions = lf._get_transitions(idxs)
# Create un-discretid states and nth next states
all_states = transitions['state']
states = all_states[:, :lf.history_length]
next_states = all_states[:, lf.n:lf.n + lf.history_length]
# Discrete actions to be ud as index
actions = transitions['action'][:, lf.history_length - 1]
# Calculate truncated n-step discounted returns
rewards = transitions['reward'][:, lf.history_length - 1: -1]
ret = np.matmul(rewards, lf.reward_n_step_scaling)
# Mask for non-terminal nth next states
nonterminals = transitions['nonterminal'][:, lf.history_length + lf.n - 1]
return states, actions, ret, next_states, nonterminals
def sample(lf, batch_size):
n_total = al()
states, actions, returns, next_states, nonterminals = \
lf._get_samples(batch_size, n_total)
# (np.uint8, (84, 84)), np.int32, np.float32, (np.uint8, (84, 84)), np.uint8
# s,a,r,s_next,non_terminal
return np.asarray(states, np.uint8), \
np.asarray(actions, np.int32), \
np.asarray(returns, np.float32), \
np.asarray(next_states, np.uint8), \
np.asarray(nonterminals, np.uint8)
View Code
原博⽂中给出的经验池实现代码:
mem.py
# encoding:UTF-8
"""Code /tambetm/simple_dqn/blob/master/src/replay_memory.py"""
import random
import numpy as np
class ReplayBuffer(object):
def__init__(lf, config):
lf.s = 0
lfn_format = confign_format  # buffer中数据的格式,'NCHW'或'NHWC'
lf.buffer_size = play_buffer_size  # 缓存池的最⼤容量
lf.history_length = config.history_length  # ⼀个状态,state的历史数据长度
lf.dims = (config.screen_height, config.screen_width)  # ⼀帧图像的⾼、宽
lf.batch_size = config.batch_size  # mini_batch_size ⼤⼩
lf.current = 0  # 指针指向的索引号,下⼀帧新数据存储的位置
""" expericence replay buffer  定义经验池 pre_state->a,r,s,terminal """
lf.actions = np.empty(lf.buffer_size, dtype=np.uint8)
dtype=np.float32)  # 设定屏幕截图汇总,states
# 选择动作前的状态 s,a,s+1,中的状态s,当前状态
lf.prestates = np.empty((lf.batch_size, lf.history_length) + lf.dims, \
dtype=np.float32)
# 选择动作前的状态 s,a,s+1,中的状态s+1,下⼀状态
lf.poststates = np.empty((lf.batch_size, lf.history_length) + lf.dims, \
dtype=np.float32)
# 判断设置是否正确
比尔盖茨英文简介
asrt lf.history_length >= 1  # history_length,状态state由⼏个图像组成,⼤⼩⾄少为1
def add(lf, action, reward, screen, terminal):
""" 向experience buffer中加⼊新的a,r,s,terminal操作 """
asrt screen.shape == lf.dims  # 判断传⼊的screen变量维度是否符合设定
# screen is post-state, after action and reward
# screen 是动作后的图像,前⼀状态执⾏动作action后获得reward,screen
# current指⽰当前的加⼊位置
lf.actions[lf.current] = action
lf.screens[lf.current, ...] = screen
# experience buffer没有满时,current等于count,current⾃加⼀后赋值给count
# buffer满时,count等于buffer容量,固定不变,count=buffer_size, current⾃加⼀,进⾏指针平移
# 加⼊新值后,指针位置⾃动加⼀
lf.current = (lf.current + 1) % lf.buffer_size  # buffer_size经验池⼤⼩
def getState(lf, index):
return lf.screens[(index - (lf.history_length - 1)):(index + 1), ...]
def sample(lf):
# memory must include poststate, prestate and history
unt > lf.history_length  # history_length⾄少为1,由于要考虑前后两个状态所以count⾄少为2 # sample random indexes
indexes = []
while len(indexes) < lf.batch_size:
# find random index
while True:
# sample one index (ignore states wraping over
index = random.randint(lf.history_length, lf.count - 1)
# if wraps over current pointer, then get new one
if index - lf.history_length < lf.current <= index:
continue
relong# if wraps over episode end, then get new one
# poststate (last screen) can be terminal state!
inals[(index - lf.history_length):index].any():
lf.s += 1
kotokocontinue
# otherwi u this index
break
# having index first is fastest in C-order matrices
英语卡通片lf.prestates[len(indexes), ...] = lf.getState(index - 1)
lf.poststates[len(indexes), ...] = lf.getState(index)
indexes.append(index)
actions = lf.actions[indexes]
rewards = lf.rewards[indexes]
terminals = lf.terminals[indexes]
# return s,a,s,a+1,terminal
if lfn_format == 'NHWC':
anspo(lf.prestates, (0, 2, 3, 1)), actions, \
rewards, np.transpo(lf.poststates, (0, 2, 3, 1)), terminals
el:  # format is 'NCHW', faster than 'NHWC'
return lf.prestates, actions, rewards, lf.poststates, terminals
View Code
运⾏对⽐的代码:
speed.py
# encoding:UTF-8
import numpy as np
import time
class Config(object):
def__init__(lf):
lfn_format = "NCHW"
lf.history_length= 4布奇乐乐园官网
lf.screen_height = 84#100
lf.screen_width = 84#100
lf.batch_size = 32
<_capacity = 5*10000#100*10000
lf.discount = 0.1
config = Config()
def last_year():
from mem import ReplayBuffer as ReplayBuffer_1
rf = ReplayBuffer_1(config)
state = np.random.random([config.screen_height, config.screen_width])    action = np.uint8(0)
reward = np.int8(1)
for i in range(5000*10000):  #总步数
terminal =np.random.choice([True, Fal], size=1,  p=[0.1, 0.9])[0]
rf.add(action, reward, state, terminal)
unt >= 5*10000:    # 开始抽样的步数
rf.sample()
if i%10000 == 0:
print(i)
if i == 5*10000:
a = time.time()
if i ==55*10000:
b = time.time()
break
print(b-a)
print(rf.s)
def this_year():
from memory import Replay
rf = Replay(config)
state = np.random.random([config.screen_height, config.screen_width])    action = np.uint8(0)
reward = np.int8(1)
for i in range(5000 * 10000):  # 总步数
terminal = np.random.choice([True, Fal], size=1, p=[0.1, 0.9])[0]
rf.append(state, action, reward, terminal)
al() >= 5 * 10000:  # 开始抽样的步数
rf.sample(32)
if i % 10000 == 0:
print(i)
if i == 5 * 10000:

本文发布于:2023-06-15 06:31:22,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/90/145621.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:经验   状态   指针   动作   设定   位置   是否   实现
相关文章
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图