Transformer的PyTorch实现(超详细)
⽂本主要介绍⼀下如何使⽤ PyTorch 复现 Transformer,实现简单的机器翻译任务。请先花上 15 分钟阅读我的这篇⽂章 ,再来看⽂本,⽅能达到醍醐灌顶,事半功倍的效果
数据预处理
这⾥我并没有⽤什么⼤型的数据集,⽽是⼿动输⼊了两对德语→英语的句⼦,还有每个字的索引也是我⼿动硬编码上去的,主要是为了降低代码阅读难度,我希望读者能更关注模型实现的部分
# S: Symbol that shows starting of decoding input
# E: Symbol that shows starting of decoding output
# P: Symbol that will fill in blank quence if current batch data size is short than time steps
ntences =[
# enc_input dec_input dec_output
['ich mochte ein bier P','S i want a beer .','i want a beer . E'],
['ich mochte ein cola P','S i want a coke .','i want a coke . E']
宁波小吃]
# Padding Should be Zero
src_vocab ={'P':0,'ich':1,'mochte':2,'ein':3,'bier':4,'cola':5}
src_vocab_size =len(src_vocab)
tgt_vocab ={'P':0,'i':1,'want':2,'a':3,'beer':4,'coke':5,'S':6,'E':7,'.':8}
idx2word ={i: w for i, w in enumerate(tgt_vocab)}
tgt_vocab_size =len(tgt_vocab)
src_len =5# enc_input max quence length
tgt_len =6# dec_input(=dec_output) max quence length
def make_data(ntences):
enc_inputs, dec_inputs, dec_outputs =[],[],[]
for i in range(len(ntences)):
enc_input =[[src_vocab[n]for n in ntences[i][0].split()]]# [[1, 2, 3, 4, 0], [1, 2, 3, 5, 0]]
dec_input =[[tgt_vocab[n]for n in ntences[i][1].split()]]# [[6, 1, 2, 3, 4, 8], [6, 1, 2, 3, 5, 8]]
dec_output =[[tgt_vocab[n]for n in ntences[i][2].split()]]# [[1, 2, 3, 4, 8, 7], [1, 2, 3, 5, 8, 7]]
d(enc_input)
d(dec_input)
d(dec_output)
return torch.LongTensor(enc_inputs), torch.LongTensor(dec_inputs), torch.LongTensor(dec_outputs)
enc_inputs, dec_inputs, dec_outputs = make_data(ntences)
class MyDataSet(Data.Datat):
def__init__(lf, enc_inputs, dec_inputs, dec_outputs):
小班个案分析
super(MyDataSet, lf).__init__()
<_inputs = enc_inputs
lf.dec_inputs = dec_inputs
lf.dec_outputs = dec_outputs
def__len__(lf):
_inputs.shape[0]
亦余心之所善兮def__getitem__(lf, idx):
_inputs[idx], lf.dec_inputs[idx], lf.dec_outputs[idx]
loader = Data.DataLoader(MyDataSet(enc_inputs, dec_inputs, dec_outputs),2,True)
模型参数
下⾯变量代表的含义依次是
1. 字嵌⼊&位置嵌⼊的维度,这俩值是相同的,因此⽤⼀个变量就⾏了
2. FeedForward层隐藏神经元个数
3. Q、K、V向量的维度,其中Q与K的维度必须相等,V的维度没有限制,不过为了⽅便起见,我都设为64
4. Encoder和Decoder的个数
5. 多头注意⼒中head的数量
# Transformer Parameters
d_model =512# Embedding Size
d_ff =2048# FeedForward dimension
d_k = d_v =64# dimension of K(=Q), V
n_layers =6# number of Encoder of Decoder Layer
n_heads =8# number of heads in Multi-Head Attention
上⾯都⽐较简单,下⾯开始涉及到模型就⽐较复杂了,因此我会将模型拆分成以下⼏个部分进⾏讲解
Positional Encoding
睁开眼Pad Mask(针对句⼦不够长,加了pad,因此需要对pad进⾏mask)
Subquence Mask(Decoder input不能看到未来时刻单词信息,因此需要mask)
ScaledDotProductAttention(计算context vector)
Multi-Head Attention
FeedForward Layer
Encoder Layer
Encoder
Decoder Layer
Decoder
Transformer
关于代码中的注释,如果值为src_len或者tgt_len的,我⼀定会写清楚,但是有些函数或者类,Encoder和Decoder都有可能调⽤,因此就不能确定究竟是src_len还是tgt_len,对于不确定的,我会记作q_len
Positional Encoding
def get_sinusoid_encoding_table(n_position, d_model):
识字表
def cal_angle(position, hid_idx):
return position / np.power(10000,2*(hid_idx //2)/ d_model)
def get_posi_angle_vec(position):
return[cal_angle(position, hid_j)for hid_j in range(d_model)]
sinusoid_table = np.array([get_posi_angle_vec(pos_i)for pos_i in range(n_position)])
sinusoid_table[:,0::2]= np.sin(sinusoid_table[:,0::2])# dim 2i
sinusoid_table[:,1::2]= np.cos(sinusoid_table[:,1::2])# dim 2i+1
return torch.FloatTensor(sinusoid_table)
这段代码并不复杂,传⼊的参数分别是字库的⼤⼩,以及位置编码的维度。最终返回的参数,维度也即[n_position, d_model],和字编码的⼤⼩⼀摸⼀样
Pad Mask
def get_attn_pad_mask(q_q, q_k):
'''
q_q: [batch_size, q_len]
q_k: [batch_size, q_len]
q_len could be src_len or it could be tgt_len
q_len in q_q and q_len in q_k maybe not equal
'''
batch_size, len_q = q_q.size()
batch_size, len_k = q_k.size()
# eq(zero) is PAD token
pad_attn_mask = q_k.data.eq(0).unsqueeze(1)# [batch_size, 1, len_k], Fal is masked
return pad_pand(batch_size, len_q, len_k)# [batch_size, len_q, len_k]
由于在Encoder和Decoder中都需要进⾏mask操作,因此就⽆法确定这个函数的参数中q_len的值,如果是在Encoder中调⽤
的,q_len就等于src_len;如果是在Decoder中调⽤的,q_len就有可能等于src_len,也有可能等于tgt_len(因为Decoder有两次mask)
这个函数最核⼼的⼀句代码是q_k.data.eq(0),这句的作⽤是返回⼀个⼤⼩和q_k⼀样的tensor,只不过⾥⾯的值只有True和Fal。如果q_k某个位置的值等于0,那么对应位置就是True,否则即
为Fal。举个例⼦,输⼊为q_data = [1, 2, 3, 4, 0],q_data.data.eq(0)就会返回[Fal, Fal, Fal, Fal, True]
剩下的代码主要是扩展维度,强烈建议读者打印出来,看看最终返回的数据是什么样⼦
Subquence Mask
def get_attn_subquence_mask(q):
'''
q: [batch_size, tgt_len]
'''
attn_shape =[q.size(0), q.size(1), q.size(1)]
subquence_mask = np.s(attn_shape), k=1)# Upper triangular matrix
subquence_mask = torch.from_numpy(subquence_mask).byte()
return subquence_mask
Subquence Mask只有Decoder会⽤到,主要作⽤是屏蔽未来时刻单词的信息。⾸先通过np.ones()⽣成⼀个全1的⽅阵,然后通
过np.triu()⽣成⼀个上三⾓矩阵,下图是np.triu()⽤法
ScaledDotProductAttention
class ScaledDotProductAttention(nn.Module):
def__init__(lf):
super(ScaledDotProductAttention, lf).__init__()
def forward(lf, Q, K, V, attn_mask):
'''
Q: [batch_size, n_heads, len_q, d_k]
K: [batch_size, n_heads, len_k, d_k]
V: [batch_size, n_heads, len_v(=len_k), d_v]
attn_mask: [batch_size, n_heads, q_len, q_len]
'''
scores = torch.matmul(Q, K.transpo(-1,-2))/ np.sqrt(d_k)# scores : [batch_size, n_heads, len_q, len_k]
scores.masked_fill_(attn_mask,-1e9)# Fills elements of lf tensor with value where mask is True.
attn = nn.Softmax(dim=-1)(scores)
context = torch.matmul(attn, V)# [batch_size, n_heads, len_q, d_v]
return context, attn
这⾥要做的是,通过Q和K计算出scores,然后将scores和V相乘,得到每个单词的context vector
第⼀步是将Q和K的转置相乘没什么好说的,相乘之后得到的scores还不能⽴刻进⾏softmax,需要和attn_mask相加,把⼀些需要屏蔽的信息屏蔽掉,attn_mask是⼀个仅由True和Fal组成的tensor,并且⼀定会保证attn_mask和scores的维度四个值相同(不然⽆法做对应位置相加)
mask完了之后,就可以对scores进⾏softmax了。然后再与V相乘,得到context
MultiHeadAttention
韩国电影现爱class MultiHeadAttention(nn.Module):
def__init__(lf):
super(MultiHeadAttention, lf).__init__()
lf.W_Q = nn.Linear(d_model, d_k * n_heads, bias=Fal)
lf.W_K = nn.Linear(d_model, d_k * n_heads, bias=Fal)
lf.W_V = nn.Linear(d_model, d_v * n_heads, bias=Fal)
lf.fc = nn.Linear(n_heads * d_v, d_model, bias=Fal)
def forward(lf, input_Q, input_K, input_V, attn_mask):
'''
input_Q: [batch_size, len_q, d_model]
input_K: [batch_size, len_k, d_model]
input_V: [batch_size, len_v(=len_k), d_model]
attn_mask: [batch_size, q_len, q_len]
'''
residual, batch_size = input_Q, input_Q.size(0)
# (B, S, D) -proj-> (B, S, D_new) -split-> (B, S, H, W) -trans-> (B, H, S, W)
Q = lf.W_Q(input_Q).view(batch_size,-1, n_heads, d_k).transpo(1,2)# Q: [batch_size, n_heads, len_q, d_k]
K = lf.W_K(input_K).view(batch_size,-1, n_heads, d_k).transpo(1,2)# K: [batch_size, n_heads, len_k, d_k]
V = lf.W_V(input_V).view(batch_size,-1, n_heads, d_v).transpo(1,2)# V: [batch_size, n_heads, l
en_v(=len_k), d_v]
attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads,1,1)# attn_mask : [batch_size, n_heads, q_len, q_len]
# context: [batch_size, n_heads, len_q, d_v], attn: [batch_size, n_heads, len_q, len_k]
context, attn = ScaledDotProductAttention()(Q, K, V, attn_mask)
context = anspo(1,2).reshape(batch_size,-1, n_heads * d_v)# context: [batch_size, len_q, n_heads * d_v]
output = lf.fc(context)# [batch_size, len_q, d_model]
return nn.LayerNorm(d_model)(output + residual), attn
完整代码中⼀定会有三处地⽅调⽤MultiHeadAttention(),Encoder Layer调⽤⼀次,传⼊的input_Q、input_K、input_V全部都
是enc_inputs;Decoder Layer中两次调⽤,第⼀次传⼊的全是dec_inputs,第⼆次传⼊的分别是dec_outputs,enc_outputs,enc_outputs FeedForward Layer
class PoswiFeedForwardNet(nn.Module):
def__init__(lf):
super(PoswiFeedForwardNet, lf).__init__()
lf.fc = nn.Sequential(
nn.Linear(d_model, d_ff, bias=Fal),
nn.ReLU(),
nn.Linear(d_ff, d_model, bias=Fal)
)
def forward(lf, inputs):
'''
inputs: [batch_size, q_len, d_model]
最感动的事
'''
residual = inputs
output = lf.fc(inputs)
return nn.LayerNorm(d_model)(output + residual)# [batch_size, q_len, d_model]
这段代码⾮常简单,就是做两次线性变换,残差连接后再跟⼀个Layer Norm
Encoder Layer
class EncoderLayer(nn.Module):
def__init__(lf):
super(EncoderLayer, lf).__init__()
<_lf_attn = MultiHeadAttention()
lf.pos_ffn = PoswiFeedForwardNet()
def forward(lf, enc_inputs, enc_lf_attn_mask):
'''
新硬盘
enc_inputs: [batch_size, src_len, d_model]
enc_lf_attn_mask: [batch_size, src_len, src_len]
'''
# enc_outputs: [batch_size, src_len, d_model], attn: [batch_size, n_heads, src_len, src_len]
enc_outputs, attn = lf.enc_lf_attn(enc_inputs, enc_inputs, enc_inputs, enc_lf_attn_mask)# enc_inputs to same Q,K,V
enc_outputs = lf.pos_ffn(enc_outputs)# enc_outputs: [batch_size, src_len, d_model]
return enc_outputs, attn
将上述组件拼起来,就是⼀个完整的Encoder Layer
Encoder
class Encoder(nn.Module):
def__init__(lf):
super(Encoder, lf).__init__()
lf.src_emb = nn.Embedding(src_vocab_size, d_model)
lf.pos_emb = nn.Embedding.from_pretrained(get_sinusoid_encoding_table(src_vocab_size, d_model),freeze=True)
lf.layers = nn.ModuleList([EncoderLayer()for _ in range(n_layers)])
def forward(lf, enc_inputs):
'''
enc_inputs: [batch_size, src_len]
'''
word_emb = lf.src_emb(enc_inputs)# [batch_size, src_len, d_model]
pos_emb = lf.pos_emb(enc_inputs)# [batch_size, src_len, d_model]
enc_outputs = word_emb + pos_emb
enc_lf_attn_mask = get_attn_pad_mask(enc_inputs, enc_inputs)# [batch_size, src_len, src_len]
enc_lf_attns =[]
for layer in lf.layers:
# enc_outputs: [batch_size, src_len, d_model], enc_lf_attn: [batch_size, n_heads, src_len, src_len]
enc_outputs, enc_lf_attn = layer(enc_outputs, enc_lf_attn_mask)
enc_lf_attns.append(enc_lf_attn)
return enc_outputs, enc_lf_attns
使⽤nn.ModuleList()⾥⾯的参数是列表,列表⾥⾯存了n_layers个Encoder Layer
由于我们控制好了Encoder Layer的输⼊和输出维度相同,所以可以直接⽤个for循环以嵌套的⽅式,将上⼀次Encoder Layer的输出作为下⼀次Encoder Layer的输⼊
Decoder Layer