tensorflow2:attention机制实现
代码参考⽹络上资料,如有侵权,可联系删除
1. 为什么进⾏attention
物理意义: 将 Q, K 投影在不同的空间上, 然后学习相似度。v 是key的内容表⽰。初始化 WQ,WK不同是lf-attention具有泛化能⼒的原因, 从⽽学习到 序列中依赖语义关系。
如果不设置 W_Q、W_k、W_v,我们计算的权重很⼤程度上依赖于我们如何确定原始输⼊向量。
2. 官⽹实现
tf.keras.layers.Attention(
u_scale=Fal, **kwargs
)
输⼊为形状[batch_size,Tq,dim]的查询张量,形状[batch_size,Tv,dim]的值张量和形状[batch_size,Tv,dim]的键张量
计算遵循以下步骤:
真挚爱情计算形状为[batch_size,Tq,Tv]的分数作为查询键点积:
scores = tf.matmul(query,key,transpo_b = True)。
使⽤分数来计算形状为[batch_size,Tq,Tv]的分布:
distribution = tf.nn.softmax(scores)。
使⽤distribution创建具有形状[batch_size,Tq,dim]的值的线性组合:
返回tf.matmul(distribution,value)。
参数:
u_scale:如果为True,将创建⼀个标量变量来缩放scores
causal:设置为True可使解码器lf-attention。添加⼀个罩,使位置i⽆法参与位置j> i。这样可以防⽌信息流从未来传递到过去。dropout:attention scores下降的百分⽐
3. ⾃定义lf-attention
# 缩放点注意⼒机制
def scaled_dot_product_attention(q, k, v, mask):
"""
Args:
q: shape == (...., q_len_q, depth)
k: shape == (...., q_len_k, depth)
v: shape == (...., q_len_v, depth_v)
q_len_v = q_len_k
mask : shape == (q_len_q, q_len_k)
Returns:
- output: weighted sum
-
attention_weights: weights of attention
"""
print('---q', tf.shape(q))
print('---k', tf.shape(k))
print('---v', tf.shape(v))
# matmul_qk.shapes: [....., q_len_q, q_len_k]
matmul_qk = tf.matmul(q, k, transpo_b=True)
print('---qk', tf.shape(matmul_qk))
dk = tf.cast(tf.shape(k)[-1], tf.float32)
# key的向量维度。为什么选择 key,因为向量初始化时是0,1的正太分布。 q,k 矩阵相乘时,在key的纬度上进⾏累加,⽅差变为k。因此做样的处理。# matmul_qk.shapes: [....., q_len_q, q_len_k]
scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)
if mask is not None:
scaled_attention_logits +=(mask *-1e9)# 给mask 乘⼀个最⼩的值。该值加到对应的padding=0位置,这时 softmax结果是趋近0。
# attention_weights.shape == [...., q_len_q, q_len_k]
attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)# -1 表⽰在最后⼀个纬度熵进⾏softmax
print('---attention_weights', tf.shape(attention_weights))
# out_puts.shape == [...., q_len_q, depth_v]
out_puts = tf.matmul(attention_weights, v)
print('---out_puts', tf.shape(out_puts))
return out_puts, attention_weights
# test dot-attention
# def print_scaled_dot_product_attention(q, k, v):
# temp_out, temp_att = scaled_dot_product_attention(q, k, v, None)
# return temp_out, temp_att
#
# temp_k = tf.constant([[1, 3, 0],
# [2, 1, 0],
# [3, 2, 0],
# [4, 1, 0]
# ], dtype=tf.float32)
#
# temp_v = tf.constant([[1, 2],
# [2, 1],
# [3, 2],
# [4, 1]
# ], dtype=tf.float32) # q_len_v = q_len_k
#
# temp_q = tf.constant([[1, 3, 0],
# [2, 3, 0],
# [4, 1, 0]
# ], dtype=tf.float32) # q 的depth 与 k的depth 需要相同
#
# temp_out, temp_att = print_scaled_dot_product_attention(temp_q, temp_k, temp_v)
#
# print(temp_out)
# print(temp_att)
#
# 带mask 的attention
def attention_mask():
attention = layers.Attention(u_scale=Fal, dropout=0)
#
# key: [batch_sz, key_dims, dims] = [1, 4, 2]
enc_outputs = tf.constant([[1,1],[2,2],[3,3],[4,4]], dtype=tf.float32)
# value: [batch_sz, value_dims, dims] = [1, 4, 2 ]
# mask
#如果给定, mask==Fal的位置输出为0
#如果给定, mask==Fal的位置不会对输出产⽣贡献.
value_mask = tf.constant([[True,True,Fal,Fal]], dtype=tf.bool)#
# query: [batch_sz, query_dims, dims] = [1, 1, 2]
dec_outputs = tf.constant([[[1,1]]], dtype=tf.float32)
atten = attention([dec_outputs, enc_outputs, enc_outputs],[None, value_mask])# query key value
# ⾃定义验证
# tf.reduce_sum(enc_outputs * dec_outputs, 2) 等价 matmul_qk = tf.matmul(q, k, transpo_b=True)
score = tf.reduce_sum(enc_outputs * dec_outputs,2)-1.e9*(1- tf.cast(value_mask, dtype=tf.float32))
weight = keras.activations.softmax(score, axis=1)
att = tf.expand_dims(weight,2)* enc_outputs
att = tf.reduce_sum(att,1)
# 验证官⽹mask ⽅法与⾃定义mask⽅法结果⼀致
#点击缩放后,发现与官⽹给的u_scale有差异
# dk = tf.cast(tf.shape(enc_outputs)[-1], tf.float32)
# score = tf.reduce_sum(enc_outputs * dec_outputs, 2)/tf.math.sqrt(dk) - 1.e9 * (1 - tf.cast(value_mask, dtype=tf.float32)) # 单头lf-attention
# 单头lf-attention
class OneHeadAttention(keras.layers.Layer):
"""
理论上
x --Wq0 -->q0
x --Wk0 --k0
x --Wv0 --v0
lf attention, qkv ⼀样的?
实战中:, qkv 不⼀样
x --Wq0 -->q0
x --Wk0 --k0
x --Wv0 --v0
⼩矩阵乘法,变为⼤矩阵的乘法。
q -->Wq(⼤的矩阵)->Q -- split -> q0, q1, q2 [batch_size * q_len, depth]
k, v ⼀样
"""
def__init__(lf, d_model):
super(OneHeadAttention, lf).__init__()
lf.d_model = d_model #
lf.WQ = keras.layers.Den(lf.d_model)
lf.WK = keras.layers.Den(lf.d_model)
lf.WV = keras.layers.Den(lf.d_model)
lf.den = keras.layers.Den(lf.d_model)
def call(lf, q, k, v, mask):
batch_size = tf.shape(q)[0]
# ⽣成⼤的 q, k, v矩阵
q = lf.WQ(q)# q.shape:(batch_size, q_len_q, depth)
k = lf.WK(k)# k.shape:(batch_size, q_len_k, depth)
v = lf.WV(v)# k.shape:(batch_size, q_len_v, depth)
# scaled_attention_out_puts.shape: [batch_size, q_len_q, depth ]
# attention_weights.shape: [batch_size, q_len_q, q_len_k],
scaled_attention_out_puts, attention_weights = \
scaled_dot_product_attention(q, k, v, mask)
output = lf.den(scaled_attention_out_puts)
return output, attention_weights
# test OneHeadAttention
temp_mha = OneHeadAttention(d_model=128)# 类初始化
y = tf.random.uniform((1,60,64))# [batch_size, q_len_q, dim] 因此,输⼊的dim可以忽略。最终输
出的是v的dim output, attention_weights = temp_mha(y, y, y, mask=None)# 初始化 q, k, v ,默认调⽤call 函数。
# MultiHeadAttention 实现
from tensorflow import keras
class MultiHeadAttention(keras.layers.Layer):
"""
理论上
x --Wq0 -->q0
x --Wk0 --k0
x --Wv0 --v0
lf attention, qkv ⼀样的?
实战中:, qkv 不⼀样
x --Wq0 -->q0
x --Wk0 --k0
x --Wv0 --v0
⼩矩阵乘法,变为⼤矩阵的乘法。
q -->Wq(⼤的矩阵)->Q -- split -> q0, q1, q2 [batch_size * q_len, depth]
k, v ⼀样
"""
def__init__(lf, d_model, num_heads):
super(MultiHeadAttention, lf).__init__()
lf.num_heads = num_heads # head 的个数
lf.d_model = d_model # 多头最终concat 的dim
asrt lf.d_model % lf.num_heads ==0# model的维度必须是head的整数倍如何调理宫寒呢
lf.depth = lf.d_model // lf.num_heads
lf.WQ = keras.layers.Den(lf.d_model)
lf.WK = keras.layers.Den(lf.d_model)
lf.WV = keras.layers.Den(lf.d_model)
lf.den = keras.layers.Den(lf.d_model)
def split_heads(lf, x, batch_size):
x = tf.reshape(x,
(batch_size,-1, lf.num_heads, lf.depth))
anspo(x, perm=[0,2,1,3])
def call(lf, q, k, v, mask):
batch_size = tf.shape(q)[0]
# ⽣成⼤的 q, k, v矩阵
q = lf.WQ(q)# q.shape:(batch_size, q_len_q, depth)
一树梅花一放翁全诗k = lf.WK(k)# k.shape:(batch_size, q_len_k, depth)
v = lf.WV(v)# k.shape:(batch_size, q_len_v, depth)
# q.shape :[batch_size, num_heads, q_len_q, depth]
q = lf.split_heads(q, batch_size)
# k.shape :[batch_size, num_heads, q_len_k, depth]
k = lf.split_heads(k, batch_size)
不苟言笑是什么意思# v.shape :[batch_size, num_heads, q_len_k, depth]
v = lf.split_heads(v, batch_size)
# scaled_attention_out_puts.shape: [batch_size, num_heads, q_len_q, depth ]
# attention_weights.shape: [batch_size, num_heads, q_len_q, q_len_k],
scaled_attention_out_puts, attention_weights = \
scaled_dot_product_attention(q, k, v, mask)
# before :scaled_attention_out_puts.shape:[batch_size, num_heads, q_len_q, depth]
# after :scaled_attention_out_puts.shape:[batch_size, q_len_q, num_heads, depth]
scaled_attention_out_puts = tf.transpo(
scaled_attention_out_puts, perm=[0,2,1,3])#
# 将num_heads 进⾏降纬度。即多个头进⾏合并
# concat_attention.shape: [batch_size, q_len_q, d_model]
concat_attention = tf.reshape(scaled_attention_out_puts,(batch_size,-1, lf.d_model))
output = lf.den(concat_attention)
return output, attention_weights
# test MultiHeadAttention
temp_mha = MultiHeadAttention(d_model=512, num_heads=8)# 类初始化
y = tf.random.uniform((1,60,256))# [batch_size, q_len_q, dim] 因此,输⼊的dim可以忽略。最终输出的是v的dim output, attention_weights = temp_mha(y, y, y, mask=None)# 初始化 q, k, v ,默认调⽤call 函数。
print(output)
print(attention_weights)
4. 通过attention api 实现
为什么叫棒子
# 使⽤ tf.keras.layers.Attention() 进⾏attention
# 存在问题,⽬前还没弄明⽩ mask 如何加,待更新。
def attention_test():
"""
batch_size: batch的⼤⼩,
q_len: 序列的长度
embedding_dim:序列中单个元素的向量纬度
embedding_dim:序列中单个元素的向量纬度
"""
# input layers
# input.shape :(batch_size, q_len)
input_query_char = tf.keras.layers.Input(shape=(maxlen,), name="input_q_char")
# 序列进⾏embedding
# emb_q_char.shape: (batch_size, q_len, embedding_dim)
emb_q_char = tf.keras.layers.Embedding(
input_dim=vocab_size,
output_dim=embedding_dim,
input_length=maxlen,
name='emb_q_char')(input_query_char)
print('----emb_q_char', emb_q_char.shape)
# 初始化 q, k
张家口战役# q.shape: (batch_size, q_len_q, embedding_dim)
# k.shape: (batch_size, q_len_k, embedding_dim)
WQ = keras.layers.Den(embedding_dim)
WK = keras.layers.Den(embedding_dim)
q = WQ(emb_q_char)
k = WK(emb_q_char)
# 调⽤attention层
梦见# attention_outputs.shape :(batch_size, q_len_q, embedding_dim)
# attention_weights.shape: (batch_size, q_len_q, q_len_k)
attention_outputs, attention_weights = tf.keras.layers.Attention()([q, k],
return_attention_scores=True)
print('---attention_outputs', attention_outputs.shape)
print('---attention weights', attention_weights.shape)
# Add&Norm :
# x 与 attention 进⾏进⾏concat
# 标注化处理, axis=1 ,表⽰在⾏上进⾏标准化处理
冬天儿童画# norm_data.shape: (batch_size, q_len, embedding_dim)
layer_normal = tf.keras.layers.LayerNormalization(epsilon=1e-6)
norm_data = layer_normal(emb_q_char + attention_outputs)
print('----norm_data shape', norm_data.shape)
# FeedWork:前馈神经⽹络
# diff :前馈神经⽹络的维度, 具体这个纬度如何定义需要确认下。
# ff_data.shape : (batch_size, q_len_q, embedding_dim)
fcc = tf.keras.layers.Den(1024, activation='relu', name='fcc')(norm_data)
ff_data = tf.keras.layers.Den(embedding_dim)(fcc)
print("----fcc shape", fcc.shape)
print("----ff data", ff_data.shape)
# 结果进⾏ sum pool
# attention_sum_pool.shape:(batch_size,embedding_dim )
attention_sum_pool = tf.reduce_sum(ff_data,1)
print("----attention_sum shape", attention_sum_pool.shape)
# 关于attention最后结果处理两种⽅法:
# ⽅法1:经过feed work net,然后sum pool
# ⽅法2: attention结果sum pool + input进⾏mean pool (后期再验证)
# DNN 建模
query_merge_vector = tf.atenate([
attention_sum_pool
])
query_vector_l1 = tf.keras.layers.Den(100, activation='relu', name='query_d1')(query_merge_vector) query_vector_l2 = tf.keras.layers.Den(100, activation='relu',
name="query_d4", kernel_regularizer='l2')(query_vector_l1)
output = tf.keras.layers.Den(1, activation='sigmoid', name='output')(query_vector_l2)
model = dels.Model(
inputs=[input_query_char],