# 최종 버전
class PositionalEncoding(tf.keras.layers.Layer):
def __init__(self, position, d_model):
super(PositionalEncoding, self).__init__()
self.pos_encoding = self.positional_encoding(position, d_model)
def get_angles(self, position, i, d_model):
= 1 / tf.pow(10000, (2 * (i // 2)) / tf.cast(d_model, tf.float32))
angles return position * angles
def positional_encoding(self, position, d_model):
= self.get_angles(
angle_rads =tf.range(position, dtype=tf.float32)[:, tf.newaxis],
position=tf.range(d_model, dtype=tf.float32)[tf.newaxis, :],
i=d_model)
d_model
# 배열의 짝수 인덱스(2i)에는 사인 함수 적용
= tf.math.sin(angle_rads[:, 0::2])
sines
# 배열의 홀수 인덱스(2i+1)에는 코사인 함수 적용
= tf.math.cos(angle_rads[:, 1::2])
cosines
= np.zeros(angle_rads.shape)
angle_rads 0::2] = sines
angle_rads[:, 1::2] = cosines
angle_rads[:, = tf.constant(angle_rads)
pos_encoding = pos_encoding[tf.newaxis, ...]
pos_encoding
print(pos_encoding.shape)
return tf.cast(pos_encoding, tf.float32)
def call(self, inputs):
return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :]
Transformer Chatbot Tutorial
Ref: 딥 러닝을 이용한 자연어 처리 입문
Import
= PositionalEncoding(50, 128)
sample_pos_encoding
0], cmap='RdBu')
plt.pcolormesh(sample_pos_encoding.pos_encoding.numpy()['Depth')
plt.xlabel(0, 128))
plt.xlim(('Position')
plt.ylabel(
plt.colorbar() plt.show()
(1, 50, 128)
def scaled_dot_product_attention(query, key, value, mask):
# query 크기 : (batch_size, num_heads, query의 문장 길이, d_model/num_heads)
# key 크기 : (batch_size, num_heads, key의 문장 길이, d_model/num_heads)
# value 크기 : (batch_size, num_heads, value의 문장 길이, d_model/num_heads)
# padding_mask : (batch_size, 1, 1, key의 문장 길이)
# Q와 K의 곱. 어텐션 스코어 행렬.
= tf.matmul(query, key, transpose_b=True)
matmul_qk
# 스케일링
# dk의 루트값으로 나눠준다.
= tf.cast(tf.shape(key)[-1], tf.float32)
depth = matmul_qk / tf.math.sqrt(depth)
logits
# 마스킹. 어텐션 스코어 행렬의 마스킹 할 위치에 매우 작은 음수값을 넣는다.
# 매우 작은 값이므로 소프트맥스 함수를 지나면 행렬의 해당 위치의 값은 0이 된다.
if mask is not None:
+= (mask * -1e9)
logits
# 소프트맥스 함수는 마지막 차원인 key의 문장 길이 방향으로 수행된다.
# attention weight : (batch_size, num_heads, query의 문장 길이, key의 문장 길이)
= tf.nn.softmax(logits, axis=-1)
attention_weights
# output : (batch_size, num_heads, query의 문장 길이, d_model/num_heads)
= tf.matmul(attention_weights, value)
output
return output, attention_weights
class MultiHeadAttention(tf.keras.layers.Layer):
def __init__(self, d_model, num_heads, name="multi_head_attention"):
super(MultiHeadAttention, self).__init__(name=name)
self.num_heads = num_heads
self.d_model = d_model
assert d_model % self.num_heads == 0
# d_model을 num_heads로 나눈 값.
# 논문 기준 : 64
self.depth = d_model // self.num_heads
# WQ, WK, WV에 해당하는 밀집층 정의
self.query_dense = tf.keras.layers.Dense(units=d_model)
self.key_dense = tf.keras.layers.Dense(units=d_model)
self.value_dense = tf.keras.layers.Dense(units=d_model)
# WO에 해당하는 밀집층 정의
self.dense = tf.keras.layers.Dense(units=d_model)
# num_heads 개수만큼 q, k, v를 split하는 함수
def split_heads(self, inputs, batch_size):
= tf.reshape(
inputs =(batch_size, -1, self.num_heads, self.depth))
inputs, shapereturn tf.transpose(inputs, perm=[0, 2, 1, 3])
def call(self, inputs):
= inputs['query'], inputs['key'], inputs[
query, key, value, mask 'value'], inputs['mask']
= tf.shape(query)[0]
batch_size
# 1. WQ, WK, WV에 해당하는 밀집층 지나기
# q : (batch_size, query의 문장 길이, d_model)
# k : (batch_size, key의 문장 길이, d_model)
# v : (batch_size, value의 문장 길이, d_model)
# 참고) 인코더(k, v)-디코더(q) 어텐션에서는 query 길이와 key, value의 길이는 다를 수 있다.
= self.query_dense(query)
query = self.key_dense(key)
key = self.value_dense(value)
value
# 2. 헤드 나누기
# q : (batch_size, num_heads, query의 문장 길이, d_model/num_heads)
# k : (batch_size, num_heads, key의 문장 길이, d_model/num_heads)
# v : (batch_size, num_heads, value의 문장 길이, d_model/num_heads)
= self.split_heads(query, batch_size)
query = self.split_heads(key, batch_size)
key = self.split_heads(value, batch_size)
value
# 3. 스케일드 닷 프로덕트 어텐션. 앞서 구현한 함수 사용.
# (batch_size, num_heads, query의 문장 길이, d_model/num_heads)
= scaled_dot_product_attention(query, key, value, mask)
scaled_attention, _ # (batch_size, query의 문장 길이, num_heads, d_model/num_heads)
= tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
scaled_attention
# 4. 헤드 연결(concatenate)하기
# (batch_size, query의 문장 길이, d_model)
= tf.reshape(scaled_attention,
concat_attention -1, self.d_model))
(batch_size,
# 5. WO에 해당하는 밀집층 지나기
# (batch_size, query의 문장 길이, d_model)
= self.dense(concat_attention)
outputs
return outputs
def create_padding_mask(x):
= tf.cast(tf.math.equal(x, 0), tf.float32)
mask # (batch_size, 1, 1, key의 문장 길이)
return mask[:, tf.newaxis, tf.newaxis, :]
def encoder_layer(dff, d_model, num_heads, dropout, name="encoder_layer"):
= tf.keras.Input(shape=(None, d_model), name="inputs")
inputs
# 인코더는 패딩 마스크 사용
= tf.keras.Input(shape=(1, 1, None), name="padding_mask")
padding_mask
# 멀티-헤드 어텐션 (첫번째 서브층 / 셀프 어텐션)
= MultiHeadAttention(
attention ="attention")({
d_model, num_heads, name'query': inputs, 'key': inputs, 'value': inputs, # Q = K = V
'mask': padding_mask # 패딩 마스크 사용
})
# 드롭아웃 + 잔차 연결과 층 정규화
= tf.keras.layers.Dropout(rate=dropout)(attention)
attention = tf.keras.layers.LayerNormalization(
attention =1e-6)(inputs + attention)
epsilon
# 포지션 와이즈 피드 포워드 신경망 (두번째 서브층)
= tf.keras.layers.Dense(units=dff, activation='relu')(attention)
outputs = tf.keras.layers.Dense(units=d_model)(outputs)
outputs
# 드롭아웃 + 잔차 연결과 층 정규화
= tf.keras.layers.Dropout(rate=dropout)(outputs)
outputs = tf.keras.layers.LayerNormalization(
outputs =1e-6)(attention + outputs)
epsilon
return tf.keras.Model(
=[inputs, padding_mask], outputs=outputs, name=name) inputs
def encoder(vocab_size, num_layers, dff,
d_model, num_heads, dropout,="encoder"):
name= tf.keras.Input(shape=(None,), name="inputs")
inputs
# 인코더는 패딩 마스크 사용
= tf.keras.Input(shape=(1, 1, None), name="padding_mask")
padding_mask
# 포지셔널 인코딩 + 드롭아웃
= tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))
embeddings = PositionalEncoding(vocab_size, d_model)(embeddings)
embeddings = tf.keras.layers.Dropout(rate=dropout)(embeddings)
outputs
# 인코더를 num_layers개 쌓기
for i in range(num_layers):
= encoder_layer(dff=dff, d_model=d_model, num_heads=num_heads,
outputs =dropout, name="encoder_layer_{}".format(i),
dropout
)([outputs, padding_mask])
return tf.keras.Model(
=[inputs, padding_mask], outputs=outputs, name=name) inputs
# 디코더의 첫번째 서브층(sublayer)에서 미래 토큰을 Mask하는 함수
def create_look_ahead_mask(x):
= tf.shape(x)[1]
seq_len = 1 - tf.linalg.band_part(tf.ones((seq_len, seq_len)), -1, 0)
look_ahead_mask = create_padding_mask(x) # 패딩 마스크도 포함
padding_mask return tf.maximum(look_ahead_mask, padding_mask)
def decoder_layer(dff, d_model, num_heads, dropout, name="decoder_layer"):
= tf.keras.Input(shape=(None, d_model), name="inputs")
inputs = tf.keras.Input(shape=(None, d_model), name="encoder_outputs")
enc_outputs
# 디코더는 룩어헤드 마스크(첫번째 서브층)와 패딩 마스크(두번째 서브층) 둘 다 사용.
= tf.keras.Input(
look_ahead_mask =(1, None, None), name="look_ahead_mask")
shape= tf.keras.Input(shape=(1, 1, None), name='padding_mask')
padding_mask
# 멀티-헤드 어텐션 (첫번째 서브층 / 마스크드 셀프 어텐션)
= MultiHeadAttention(
attention1 ="attention_1")(inputs={
d_model, num_heads, name'query': inputs, 'key': inputs, 'value': inputs, # Q = K = V
'mask': look_ahead_mask # 룩어헤드 마스크
})
# 잔차 연결과 층 정규화
= tf.keras.layers.LayerNormalization(
attention1 =1e-6)(attention1 + inputs)
epsilon
# 멀티-헤드 어텐션 (두번째 서브층 / 디코더-인코더 어텐션)
= MultiHeadAttention(
attention2 ="attention_2")(inputs={
d_model, num_heads, name'query': attention1, 'key': enc_outputs, 'value': enc_outputs, # Q != K = V
'mask': padding_mask # 패딩 마스크
})
# 드롭아웃 + 잔차 연결과 층 정규화
= tf.keras.layers.Dropout(rate=dropout)(attention2)
attention2 = tf.keras.layers.LayerNormalization(
attention2 =1e-6)(attention2 + attention1)
epsilon
# 포지션 와이즈 피드 포워드 신경망 (세번째 서브층)
= tf.keras.layers.Dense(units=dff, activation='relu')(attention2)
outputs = tf.keras.layers.Dense(units=d_model)(outputs)
outputs
# 드롭아웃 + 잔차 연결과 층 정규화
= tf.keras.layers.Dropout(rate=dropout)(outputs)
outputs = tf.keras.layers.LayerNormalization(
outputs =1e-6)(outputs + attention2)
epsilon
return tf.keras.Model(
=[inputs, enc_outputs, look_ahead_mask, padding_mask],
inputs=outputs,
outputs=name) name
def decoder(vocab_size, num_layers, dff,
d_model, num_heads, dropout,='decoder'):
name= tf.keras.Input(shape=(None,), name='inputs')
inputs = tf.keras.Input(shape=(None, d_model), name='encoder_outputs')
enc_outputs
# 디코더는 룩어헤드 마스크(첫번째 서브층)와 패딩 마스크(두번째 서브층) 둘 다 사용.
= tf.keras.Input(
look_ahead_mask =(1, None, None), name='look_ahead_mask')
shape= tf.keras.Input(shape=(1, 1, None), name='padding_mask')
padding_mask
# 포지셔널 인코딩 + 드롭아웃
= tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))
embeddings = PositionalEncoding(vocab_size, d_model)(embeddings)
embeddings = tf.keras.layers.Dropout(rate=dropout)(embeddings)
outputs
# 디코더를 num_layers개 쌓기
for i in range(num_layers):
= decoder_layer(dff=dff, d_model=d_model, num_heads=num_heads,
outputs =dropout, name='decoder_layer_{}'.format(i),
dropout=[outputs, enc_outputs, look_ahead_mask, padding_mask])
)(inputs
return tf.keras.Model(
=[inputs, enc_outputs, look_ahead_mask, padding_mask],
inputs=outputs,
outputs=name) name
def transformer(vocab_size, num_layers, dff,
d_model, num_heads, dropout,="transformer"):
name
# 인코더의 입력
= tf.keras.Input(shape=(None,), name="inputs")
inputs
# 디코더의 입력
= tf.keras.Input(shape=(None,), name="dec_inputs")
dec_inputs
# 인코더의 패딩 마스크
= tf.keras.layers.Lambda(
enc_padding_mask =(1, 1, None),
create_padding_mask, output_shape='enc_padding_mask')(inputs)
name
# 디코더의 룩어헤드 마스크(첫번째 서브층)
= tf.keras.layers.Lambda(
look_ahead_mask =(1, None, None),
create_look_ahead_mask, output_shape='look_ahead_mask')(dec_inputs)
name
# 디코더의 패딩 마스크(두번째 서브층)
= tf.keras.layers.Lambda(
dec_padding_mask =(1, 1, None),
create_padding_mask, output_shape='dec_padding_mask')(inputs)
name
# 인코더의 출력은 enc_outputs. 디코더로 전달된다.
= encoder(vocab_size=vocab_size, num_layers=num_layers, dff=dff,
enc_outputs =d_model, num_heads=num_heads, dropout=dropout,
d_model=[inputs, enc_padding_mask]) # 인코더의 입력은 입력 문장과 패딩 마스크
)(inputs
# 디코더의 출력은 dec_outputs. 출력층으로 전달된다.
= decoder(vocab_size=vocab_size, num_layers=num_layers, dff=dff,
dec_outputs =d_model, num_heads=num_heads, dropout=dropout,
d_model=[dec_inputs, enc_outputs, look_ahead_mask, dec_padding_mask])
)(inputs
# 다음 단어 예측을 위한 출력층
= tf.keras.layers.Dense(units=vocab_size, name="outputs")(dec_outputs)
outputs
return tf.keras.Model(inputs=[inputs, dec_inputs], outputs=outputs, name=name)
= transformer(
small_transformer = 9000,
vocab_size = 4,
num_layers = 512,
dff = 128,
d_model = 4,
num_heads = 0.3,
dropout ="small_transformer")
name
tf.keras.utils.plot_model(='small_transformer.png', show_shapes=True) small_transformer, to_file
(1, 9000, 128)
(1, 9000, 128)
You must install pydot (`pip install pydot`) and install graphviz (see instructions at https://graphviz.gitlab.io/download/) for plot_model to work.
def loss_function(y_true, y_pred):
= tf.reshape(y_true, shape=(-1, MAX_LENGTH - 1))
y_true
= tf.keras.losses.SparseCategoricalCrossentropy(
loss =True, reduction='none')(y_true, y_pred)
from_logits
= tf.cast(tf.not_equal(y_true, 0), tf.float32)
mask = tf.multiply(loss, mask)
loss
return tf.reduce_mean(loss)
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
def __init__(self, d_model, warmup_steps=4000):
super(CustomSchedule, self).__init__()
self.d_model = d_model
self.d_model = tf.cast(self.d_model, tf.float32)
self.warmup_steps = warmup_steps
def __call__(self, step):
= tf.math.rsqrt(step)
arg1 = step * (self.warmup_steps**-1.5)
arg2
return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)
= CustomSchedule(d_model=128)
sample_learning_rate
range(200000, dtype=tf.float32)))
plt.plot(sample_learning_rate(tf."Learning Rate")
plt.ylabel("Train Step") plt.xlabel(
Text(0.5, 0, 'Train Step')
Data load
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import re
import urllib.request
import time
import tensorflow_datasets as tfds
import tensorflow as tf
"https://raw.githubusercontent.com/songys/Chatbot_data/master/ChatbotData.csv", filename="ChatBotData.csv")
urllib.request.urlretrieve(= pd.read_csv('ChatBotData.csv')
train_data train_data.head()
Q | A | label | |
---|---|---|---|
0 | 12시 땡! | 하루가 또 가네요. | 0 |
1 | 1지망 학교 떨어졌어 | 위로해 드립니다. | 0 |
2 | 3박4일 놀러가고 싶다 | 여행은 언제나 좋죠. | 0 |
3 | 3박4일 정도 놀러가고 싶다 | 여행은 언제나 좋죠. | 0 |
4 | PPL 심하네 | 눈살이 찌푸려지죠. | 0 |
print('챗봇 샘플의 개수 :', len(train_data))
챗봇 샘플의 개수 : 11823
NULL 값 확인
print(train_data.isnull().sum())
Q 0
A 0
label 0
dtype: int64
?, ., !
과 같은 구두점이 있는데 공백을 추가하여 다른 문자들과 구별
= []
questions for sentence in train_data['Q']:
# 구두점에 대해서 띄어쓰기
# ex) 12시 땡! -> 12시 땡 !
= re.sub(r"([?.!,])", r" \1 ", sentence)
sentence = sentence.strip()
sentence questions.append(sentence)
= []
answers for sentence in train_data['A']:
# 구두점에 대해서 띄어쓰기
# ex) 12시 땡! -> 12시 땡 !
= re.sub(r"([?.!,])", r" \1 ", sentence)
sentence = sentence.strip()
sentence answers.append(sentence)
print(questions[:5])
print(answers[:5])
['12시 땡 !', '1지망 학교 떨어졌어', '3박4일 놀러가고 싶다', '3박4일 정도 놀러가고 싶다', 'PPL 심하네']
['하루가 또 가네요 .', '위로해 드립니다 .', '여행은 언제나 좋죠 .', '여행은 언제나 좋죠 .', '눈살이 찌푸려지죠 .']
Make Words Group
# 서브워드텍스트인코더를 사용하여 질문, 답변 데이터로부터 단어 집합(Vocabulary) 생성
= tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(
tokenizer + answers, target_vocab_size=2**13) questions
# 시작 토큰과 종료 토큰에 대한 정수 부여.
= [tokenizer.vocab_size], [tokenizer.vocab_size + 1]
START_TOKEN, END_TOKEN
# 시작 토큰과 종료 토큰을 고려하여 단어 집합의 크기를 + 2
= tokenizer.vocab_size + 2 VOCAB_SIZE
print('시작 토큰 번호 :',START_TOKEN)
print('종료 토큰 번호 :',END_TOKEN)
print('단어 집합의 크기 :',VOCAB_SIZE)
시작 토큰 번호 : [8178]
종료 토큰 번호 : [8179]
단어 집합의 크기 : 8180
정수 인코딩과 패딩
# 서브워드텍스트인코더 토크나이저의 .encode()를 사용하여 텍스트 시퀀스를 정수 시퀀스로 변환.
print('임의의 질문 샘플을 정수 인코딩 : {}'.format(tokenizer.encode(questions[20])))
임의의 질문 샘플을 정수 인코딩 : [5766, 611, 3509, 141, 685, 3747, 849]
# 서브워드텍스트인코더 토크나이저의 .encode()와 .decode() 테스트해보기
# 임의의 입력 문장을 sample_string에 저장
= questions[20]
sample_string
# encode() : 텍스트 시퀀스 --> 정수 시퀀스
= tokenizer.encode(sample_string)
tokenized_string print ('정수 인코딩 후의 문장 {}'.format(tokenized_string))
# decode() : 정수 시퀀스 --> 텍스트 시퀀스
= tokenizer.decode(tokenized_string)
original_string print ('기존 문장: {}'.format(original_string))
정수 인코딩 후의 문장 [5766, 611, 3509, 141, 685, 3747, 849]
기존 문장: 가스비 비싼데 감기 걸리겠어
# 각 정수는 각 단어와 어떻게 mapping되는지 병렬로 출력
# 서브워드텍스트인코더는 의미있는 단위의 서브워드로 토크나이징한다. 띄어쓰기 단위 X 형태소 분석 단위 X
for ts in tokenized_string:
print ('{} ----> {}'.format(ts, tokenizer.decode([ts])))
5766 ----> 가스
611 ----> 비
3509 ----> 비싼
141 ----> 데
685 ----> 감기
3747 ----> 걸리
849 ----> 겠어
# 최대 길이를 40으로 정의
= 40
MAX_LENGTH
# 토큰화 / 정수 인코딩 / 시작 토큰과 종료 토큰 추가 / 패딩
def tokenize_and_filter(inputs, outputs):
= [], []
tokenized_inputs, tokenized_outputs
for (sentence1, sentence2) in zip(inputs, outputs):
# encode(토큰화 + 정수 인코딩), 시작 토큰과 종료 토큰 추가
= START_TOKEN + tokenizer.encode(sentence1) + END_TOKEN
sentence1 = START_TOKEN + tokenizer.encode(sentence2) + END_TOKEN
sentence2
tokenized_inputs.append(sentence1)
tokenized_outputs.append(sentence2)
# 패딩
= tf.keras.preprocessing.sequence.pad_sequences(
tokenized_inputs =MAX_LENGTH, padding='post')
tokenized_inputs, maxlen= tf.keras.preprocessing.sequence.pad_sequences(
tokenized_outputs =MAX_LENGTH, padding='post')
tokenized_outputs, maxlen
return tokenized_inputs, tokenized_outputs
= tokenize_and_filter(questions, answers) questions, answers
질문과 답변의 길이 변환
print('질문 데이터의 크기(shape) :', questions.shape)
print('답변 데이터의 크기(shape) :', answers.shape)
질문 데이터의 크기(shape) : (1, 40)
답변 데이터의 크기(shape) : (1, 40)
# 0번 샘플을 임의로 출력
print(questions[0])
print(answers[0])
[8178 7915 4207 3060 41 8179 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0]
[8178 3844 74 7894 1 8179 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0]
길이 40을 맞추기 위해 0이 패딩된 것 확인
Input Encoders and Decoders and make Label
# 텐서플로우 dataset을 이용하여 셔플(shuffle)을 수행하되, 배치 크기로 데이터를 묶는다.
# 또한 이 과정에서 교사 강요(teacher forcing)을 사용하기 위해서 디코더의 입력과 실제값 시퀀스를 구성한다.
= 64
BATCH_SIZE = 20000
BUFFER_SIZE
# 디코더의 실제값 시퀀스에서는 시작 토큰을 제거해야 한다.
= tf.data.Dataset.from_tensor_slices((
dataset
{'inputs': questions,
'dec_inputs': answers[:, :-1] # 디코더의 입력. 마지막 패딩 토큰이 제거된다.
},
{'outputs': answers[:, 1:] # 맨 처음 토큰이 제거된다. 다시 말해 시작 토큰이 제거된다.
},
))
= dataset.cache()
dataset = dataset.shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE)
dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE) dataset
# 임의의 샘플에 대해서 [:, :-1]과 [:, 1:]이 어떤 의미를 가지는지 테스트해본다.
print(answers[0]) # 기존 샘플
print(answers[:1][:, :-1]) # 마지막 패딩 토큰 제거하면서 길이가 39가 된다.
print(answers[:1][:, 1:]) # 맨 처음 토큰이 제거된다. 다시 말해 시작 토큰이 제거된다. 길이는 역시 39가 된다.
[8178 3844 74 7894 1 8179 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0]
[[8178 3844 74 7894 1 8179 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0]]
[[3844 74 7894 1 8179 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0]]
Make Transformer
- \(d_{model}\) = 256
- num_layers = 2
- num_heads = 8
- \(d_{ff}\) = 512
tf.keras.backend.clear_session()
# 하이퍼파라미터
= 256
D_MODEL = 2
NUM_LAYERS = 8
NUM_HEADS = 512
DFF = 0.1
DROPOUT
= transformer(
model =VOCAB_SIZE,
vocab_size=NUM_LAYERS,
num_layers=DFF,
dff=D_MODEL,
d_model=NUM_HEADS,
num_heads=DROPOUT) dropout
(1, 8180, 256)
(1, 8180, 256)
= CustomSchedule(D_MODEL)
learning_rate
= tf.keras.optimizers.Adam(
optimizer =0.001, beta_1=0.9, beta_2=0.98, epsilon=1e-9)
learning_rate
def accuracy(y_true, y_pred):
# 레이블의 크기는 (batch_size, MAX_LENGTH - 1)
= tf.reshape(y_true, shape=(-1, MAX_LENGTH - 1))
y_true return tf.keras.metrics.sparse_categorical_accuracy(y_true, y_pred)
compile(optimizer=optimizer, loss=loss_function, metrics=[accuracy]) model.
= 50
EPOCHS =EPOCHS,verbose=False) model.fit(dataset, epochs
<keras.callbacks.History at 0x7f2207d9cf10>
Chatbot Evaluation
def preprocess_sentence(sentence):
# 단어와 구두점 사이에 공백 추가.
# ex) 12시 땡! -> 12시 땡 !
= re.sub(r"([?.!,])", r" \1 ", sentence)
sentence = sentence.strip()
sentence return sentence
def evaluate(sentence):
# 입력 문장에 대한 전처리
= preprocess_sentence(sentence)
sentence
# 입력 문장에 시작 토큰과 종료 토큰을 추가
= tf.expand_dims(
sentence + tokenizer.encode(sentence) + END_TOKEN, axis=0)
START_TOKEN
= tf.expand_dims(START_TOKEN, 0)
output
# 디코더의 예측 시작
for i in range(MAX_LENGTH):
= model(inputs=[sentence, output], training=False)
predictions
# 현재 시점의 예측 단어를 받아온다.
= predictions[:, -1:, :]
predictions = tf.cast(tf.argmax(predictions, axis=-1), tf.int32)
predicted_id
# 만약 현재 시점의 예측 단어가 종료 토큰이라면 예측을 중단
if tf.equal(predicted_id, END_TOKEN[0]):
break
# 현재 시점의 예측 단어를 output(출력)에 연결한다.
# output은 for문의 다음 루프에서 디코더의 입력이 된다.
= tf.concat([output, predicted_id], axis=-1)
output
# 단어 예측이 모두 끝났다면 output을 리턴.
return tf.squeeze(output, axis=0)
def predict(sentence):
= evaluate(sentence)
prediction
# prediction == 디코더가 리턴한 챗봇의 대답에 해당하는 정수 시퀀스
# tokenizer.decode()를 통해 정수 시퀀스를 문자열로 디코딩.
= tokenizer.decode(
predicted_sentence for i in prediction if i < tokenizer.vocab_size])
[i
print('Input: {}'.format(sentence))
print('Output: {}'.format(predicted_sentence))
return predicted_sentence
= predict("영화 볼래?") output
Input: 영화 볼래?
Output: 하루가 또 가네요 .
= predict("고민이 있어") output
Input: 고민이 있어
Output: 하루가 또 가네요 .
= predict("카페갈래?") output
Input: 카페갈래?
Output: 하루가 또 가네요 .
= predict("게임하고싶당") output
Input: 게임하고싶당
Output: 하루가 또 가네요 .
= predict("게임하자") output
Input: 게임하자
Output: 하루가 또 가네요 .
터진다..