This repository has been archived by the owner on Apr 25, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 172
/
model.py
110 lines (95 loc) · 4.4 KB
/
model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import math
import torch
import random
from torch import nn
from torch.autograd import Variable
import torch.nn.functional as F
class Encoder(nn.Module):
def __init__(self, input_size, embed_size, hidden_size,
n_layers=1, dropout=0.5):
super(Encoder, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.embed_size = embed_size
self.embed = nn.Embedding(input_size, embed_size)
self.gru = nn.GRU(embed_size, hidden_size, n_layers,
dropout=dropout, bidirectional=True)
def forward(self, src, hidden=None):
embedded = self.embed(src)
outputs, hidden = self.gru(embedded, hidden)
# sum bidirectional outputs
outputs = (outputs[:, :, :self.hidden_size] +
outputs[:, :, self.hidden_size:])
return outputs, hidden
class Attention(nn.Module):
def __init__(self, hidden_size):
super(Attention, self).__init__()
self.hidden_size = hidden_size
self.attn = nn.Linear(self.hidden_size * 2, hidden_size)
self.v = nn.Parameter(torch.rand(hidden_size))
stdv = 1. / math.sqrt(self.v.size(0))
self.v.data.uniform_(-stdv, stdv)
def forward(self, hidden, encoder_outputs):
timestep = encoder_outputs.size(0)
h = hidden.repeat(timestep, 1, 1).transpose(0, 1)
encoder_outputs = encoder_outputs.transpose(0, 1) # [B*T*H]
attn_energies = self.score(h, encoder_outputs)
return F.softmax(attn_energies, dim=1).unsqueeze(1)
def score(self, hidden, encoder_outputs):
# [B*T*2H]->[B*T*H]
energy = F.relu(self.attn(torch.cat([hidden, encoder_outputs], 2)))
energy = energy.transpose(1, 2) # [B*H*T]
v = self.v.repeat(encoder_outputs.size(0), 1).unsqueeze(1) # [B*1*H]
energy = torch.bmm(v, energy) # [B*1*T]
return energy.squeeze(1) # [B*T]
class Decoder(nn.Module):
def __init__(self, embed_size, hidden_size, output_size,
n_layers=1, dropout=0.2):
super(Decoder, self).__init__()
self.embed_size = embed_size
self.hidden_size = hidden_size
self.output_size = output_size
self.n_layers = n_layers
self.embed = nn.Embedding(output_size, embed_size)
self.dropout = nn.Dropout(dropout, inplace=True)
self.attention = Attention(hidden_size)
self.gru = nn.GRU(hidden_size + embed_size, hidden_size,
n_layers, dropout=dropout)
self.out = nn.Linear(hidden_size * 2, output_size)
def forward(self, input, last_hidden, encoder_outputs):
# Get the embedding of the current input word (last output word)
embedded = self.embed(input).unsqueeze(0) # (1,B,N)
embedded = self.dropout(embedded)
# Calculate attention weights and apply to encoder outputs
attn_weights = self.attention(last_hidden[-1], encoder_outputs)
context = attn_weights.bmm(encoder_outputs.transpose(0, 1)) # (B,1,N)
context = context.transpose(0, 1) # (1,B,N)
# Combine embedded input word and attended context, run through RNN
rnn_input = torch.cat([embedded, context], 2)
output, hidden = self.gru(rnn_input, last_hidden)
output = output.squeeze(0) # (1,B,N) -> (B,N)
context = context.squeeze(0)
output = self.out(torch.cat([output, context], 1))
output = F.log_softmax(output, dim=1)
return output, hidden, attn_weights
class Seq2Seq(nn.Module):
def __init__(self, encoder, decoder):
super(Seq2Seq, self).__init__()
self.encoder = encoder
self.decoder = decoder
def forward(self, src, trg, teacher_forcing_ratio=0.5):
batch_size = src.size(1)
max_len = trg.size(0)
vocab_size = self.decoder.output_size
outputs = Variable(torch.zeros(max_len, batch_size, vocab_size)).cuda()
encoder_output, hidden = self.encoder(src)
hidden = hidden[:self.decoder.n_layers]
output = Variable(trg.data[0, :]) # sos
for t in range(1, max_len):
output, hidden, attn_weights = self.decoder(
output, hidden, encoder_output)
outputs[t] = output
is_teacher = random.random() < teacher_forcing_ratio
top1 = output.data.max(1)[1]
output = Variable(trg.data[t] if is_teacher else top1).cuda()
return outputs