Keras(三⼗四)Transformer模型代码实现⼀,加载数据
0,导⼊深度学习包
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import sklearn
import pandas as pd
import os
import sys
import time
import tensorflow as tf
from tensorflow import keras
print(tf.__version__)
print(sys.version_info)
for module in mpl, np, pd, sklearn, tf, keras:
print(module.__name__, module.__version__)
# 1. loads data
# 2. preprocess data -> datat
# 3. tools
# 3.1 generates position embedding
# 3.2 create mask. (a. padding, b. decoder)
# 3.3 scaled_dot_product_attention
# 4. builds model
# 4.1 MultiheadAttention
# 4.2 EncoderLayer
# 4.3 DecoderLayer
# 4.4 EncoderModel
# 4.5 DecoderModel
# 4.6 Transformer
# 5. optimizer & loss
# 6. train step -> train
# 7. Evaluate and Visualize
1,下载数据
,并将tensorflow_datats⽂件夹放到home⽬录下
import tensorflow_datats as tfds
examples, info = tfds.load('ted_hrlr_translate/pt_to_en',
with_info =True,
as_supervid =True
)
2,将原始数据拆分为训练数据集和验证数据集
train_examples, val_examples = examples['train'], examples['validation']
print(info)
3,查看验证数据中的葡萄⽛语,英语数据example
for pt, en in train_examples.take(5):
print(pt.numpy())
print(en.numpy())
print()
⼆,数据预处理
1,定义,构建tokenizer
en_tokenizer = SubwordTextEncoder.build_from_corpus(
(en.numpy()for pt, en in train_examples),
target_vocab_size =2**13)
pt_tokenizer = SubwordTextEncoder.build_from_corpus(
(pt.numpy()for pt, en in train_examples),
target_vocab_size =2**13)
2,测试tokenizer的encode,decode
sample_string ="Transformer is awesome."
tokenized_string = de(sample_string)
print('Tokenized string is {}'.format(tokenized_string))
origin_string = en_tokenizer.decode(tokenized_string)
print('The original string is {}'.format(origin_string))
asrt origin_string == sample_string
for token in tokenized_string:
print('{} --> "{}"'.format(token, en_tokenizer.decode([token])))
3,将原始数据转化为subword,并乱序,过滤掉⼤于40语句,根据batch数据做padding
buffer_size =20000
batch_size =64
max_length =40
def encode_to_subword(pt_ntence, en_ntence):
pt_quence =[pt_tokenizer.vocab_size]+ de(pt_ntence.numpy())+[pt_tokenizer.vocab_size +1]
en_quence =[en_tokenizer.vocab_size]+ de(en_ntence.numpy())+[en_tokenizer.vocab_size +1] return pt_quence, en_quence
def filter_by_max_length(pt, en):
"""过滤选择长度⼩于40的语句"""
return tf.logical_and(tf.size(pt)<= max_length,
tf.size(en)<= max_length)
def tf_encode_to_subword(pt_ntence, en_ntence):
青蛙折纸"""将[encode_to_subword]函数转化为tensor⽅法"""
return tf.py_function(encode_to_subword,
[pt_ntence, en_ntence],
[tf.int64, tf.int64])
train_datat = train_examples.map(tf_encode_to_subword)
train_datat = train_datat.filter(filter_by_max_length)
train_datat = train_datat.shuffle(
buffer_size).padded_batch(
batch_size, padded_shapes=([-1],[-1]))
valid_datat = val_examples.map(tf_encode_to_subword)
valid_datat = valid_datat.filter(
filter_by_max_length).padded_batch(
batch_size, padded_shapes=([-1],[-1]))
4,查看预处理后的batch数据
for pt_batch, en_batch in valid_datat.take(1):
print(pt_batch.shape, en_batch.shape)
print(pt_batch)
print()
print(en_batch)
三,位置编码 - ⽣成 position embedding
胡辣汤的配料
1,position对应embedding矩阵的计算公式
PE(pos, 2i)= sin(pos /10000^(2i/d_model))
春天的清晨
PE(pos, 2i+1)= cos(pos /10000^(2i/d_model))
2,位置编码 - ⽣成 position embedding
# pos.shape: [ntence_length, 1]
# i.shape : [1, d_model]
# result.shape: [ntence_length, d_model]
def get_angles(pos, i, d_model):
"""获取单词pos对应embeding的⾓度"""
angle_rates =1/ np.power(10000,
(2*(i //2))/ np.float32(d_model))
return pos * angle_rates
def get_position_embedding(ntence_length, d_model):
"""返回position对应embedding矩阵"""
angle_rads = get_angles(np.arange(ntence_length)[:, np.newaxis], np.arange(d_model)[np.newaxis,:],
d_model)
# sines.shape: [ntence_length, d_model / 2]
# cosines.shape: [ntence_length, d_model / 2]
sines = np.sin(angle_rads[:,0::2])
cosines = np.cos(angle_rads[:,1::2])
# position_embedding.shape: [ntence_length, d_model]
position_embedding = np.concatenate([sines, cosines], axis =-1)
中戏分数线
# angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
# angle_rads[:, 1::2] = cosines = np.cos(angle_rads[:, 1::2])
# position_embedding = angle_rads
# position_embedding.shape: [1, ntence_length, d_model]
position_embedding = position_waxis,...]
return tf.cast(position_embedding, dtype=tf.float32)
position_embedding = get_position_embedding(50,512)
print(position_embedding.shape)
3,打印矩阵图形
def plot_position_embedding(position_embedding):
plt.pcolormesh(position_embedding[0], cmap ='RdBu')
plt.xlabel('Depth')
plt.xlim((0,512))
plt.ylabel('Position')
plt.show()
# plot_position_embedding(position_embedding)
四,mask的构建
"""
1. padding mask,
2. look ahead
"""
1,padding mask
# batch_data.shape: [batch_size, q_len]
def create_padding_mask(batch_data):
padding_mask = tf.cast(tf.math.equal(batch_data,0), tf.float32)
# [batch_size, 1, 1, q_len]
return padding_mask[:, tf.newaxis, tf.newaxis,:]
x = tf.constant([[7,6,0,0,1],[1,2,3,0,0],[0,0,0,4,5]])
create_padding_mask(x)
2,decoder中的look ahead
# attention_weights.shape: [3,3]
# [[1, 0, 0],
# [4, 5, 0],
# [7, 8, 9]]
def create_look_ahead_mask(size):
mask =1- tf.linalg.band_s((size, size)),-1,0)
return mask # (q_len, q_len)
create_look_ahead_mask(3)
五,缩放点积注意⼒机制实现
1,定义缩放点积注意⼒机制函数
def scaled_dot_product_attention(q, k, v, mask):
"""
Args:
-
q: shape == (..., q_len_q, depth)
- k: shape == (..., q_len_k, depth)
- v: shape == (..., q_len_v, depth_v)
- q_len_k == q_len_v
- mask: shape == (..., q_len_q, q_len_k)
Returns:
- output: weighted sum
- attention_weights: weights of attention
"""
# matmul_qk.shape: (..., q_len_q, q_len_k)
matmul_qk = tf.matmul(q, k, transpo_b =True)
dk = tf.cast(tf.shape(k)[-1], tf.float32)
御寒
scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)
if mask is not None:
# 使得在softmax后值趋近于0
"""在mask⾥,应该被忽略的我们会设成1,应该被保留的会设成0,⽽如果mask相应位置上为1,
那么我们就给对应的logits 加上⼀个超级⼩的负数, -1000000000,这样,对应的logits也就变成了⼀个超级⼩的数。
然后在计算softmax的时候,⼀个超级⼩的数的指数会⽆限接近与0。也就是它对应的attention的权重就是0了。"""
scaled_attention_logits +=(mask *-1e9)
# attention_weights.shape: (..., q_len_q, q_len_k)
attention_weights = tf.nn.softmax(
scaled_attention_logits, axis =-1)
# output.shape: (..., q_len_q, depth_v)
output = tf.matmul(attention_weights, v)
return output, attention_weights
2,调⽤缩放点积注意⼒,返回权重和输出
def print_scaled_dot_product_attention(q, k, v):
temp_out, temp_att = scaled_dot_product_attention(q, k, v,None) print("Attention weights are:")
print(temp_att)
print("Output is:")
print(temp_out)
3,测试
temp_k = tf.constant([[10,0,0],
[0,10,0],
[0,0,10],
[0,0,10]], dtype=tf.float32)# (4, 3)
temp_v = tf.constant([[1,0],
[10,0],
[100,5],
[1000,6]], dtype=tf.float32)# (4, 2)
temp_q1 = tf.constant([[0,10,0]], dtype=tf.float32)# (1, 3)
np.t_printoptions(suppress=True)
银耳雪梨汤的做法天津积分print_scaled_dot_product_attention(temp_q1, temp_k, temp_v)
temp_q2 = tf.constant([[0,0,10]], dtype=tf.float32)# (1, 3)
print_scaled_dot_product_attention(temp_q2, temp_k, temp_v)
temp_q3 = tf.constant([[10,10,0]], dtype=tf.float32)# (1, 3)
print_scaled_dot_product_attention(temp_q3, temp_k, temp_v)
temp_q4 = tf.constant([[0,10,0],
[0,0,10],
[10,10,0]], dtype=tf.float32)# (3, 3)
print_scaled_dot_product_attention(temp_q4, temp_k, temp_v)六,多头注意⼒机制实现
1,定义多头注意⼒函数
class MultiHeadAttention(keras.layers.Layer):
"""
理论上:
x -> Wq0 -> q0
x -> Wk0 -> k0
x -> Wv0 -> v0
实战中:
q -> Wq0 -> q0
k -> Wk0 -> k0
v -> Wv0 -> v0
实战中技巧:
q -> Wq -> Q -> split -> q0, q1, q2...
"""
def__init__(lf, d_model, num_heads):
super(MultiHeadAttention, lf).__init__()
lf.num_heads = num_heads
lf.d_model = d_model
asrt lf.d_model % lf.num_heads ==0
lf.depth = lf.d_model // lf.num_heads
lf.WQ = keras.layers.Den(lf.d_model)
拘束的拼音怎么写
lf.WK = keras.layers.Den(lf.d_model)
lf.WV = keras.layers.Den(lf.d_model)
lf.den = keras.layers.Den(lf.d_model)
def split_heads(lf, x, batch_size):