-
Notifications
You must be signed in to change notification settings - Fork 0
/
bleu.py
377 lines (325 loc) · 14.6 KB
/
bleu.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
import math
import sys
from fractions import Fraction
import warnings
from collections import Counter
from nltk.util import ngrams
def sentence_bleu(
references,
hypothesis,
weights=(0.25, 0.25, 0.25, 0.25),
smoothing_function=None,
auto_reweigh=False,
):
return corpus_bleu(
[references], [hypothesis], weights, smoothing_function, auto_reweigh
)
def corpus_bleu(
list_of_references,
hypotheses,
weights=(0.25, 0.25, 0.25, 0.25),
smoothing_function=None,
auto_reweigh=False,
):
"""
:param list_of_references: a corpus of lists of reference sentences, w.r.t. hypotheses
:type list_of_references: list(list(list(str)))
:param hypotheses: a list of hypothesis sentences
:type hypotheses: list(list(str))
:param weights: weights for unigrams, bigrams, trigrams and so on
:type weights: list(float)
:param smoothing_function:
:type smoothing_function: SmoothingFunction
:param auto_reweigh: Option to re-normalize the weights uniformly.
:type auto_reweigh: bool
:return: The corpus-level BLEU score.
:rtype: float
"""
# Before proceeding to compute BLEU, perform sanity checks.
p_numerators = Counter() # Key = ngram order, and value = no. of ngram matches.
p_denominators = Counter() # Key = ngram order, and value = no. of ngram in ref.
hyp_lengths, ref_lengths = 0, 0
assert len(list_of_references) == len(hypotheses), (
"The number of hypotheses and their reference(s) should be the " "same "
)
# Iterate through each hypothesis and their corresponding references.
for references, hypothesis in zip(list_of_references, hypotheses):
# For each order of ngram, calculate the numerator and
# denominator for the corpus-level modified precision.
for i, _ in enumerate(weights, start=1):
p_i = modified_precision(references, hypothesis, i)
p_numerators[i] += p_i.numerator
p_denominators[i] += p_i.denominator
# Calculate the hypothesis length and the closest reference length.
# Adds them to the corpus-level hypothesis and reference counts.
hyp_len = len(hypothesis)
hyp_lengths += hyp_len
ref_lengths += closest_ref_length(references, hyp_len)
# Calculate corpus-level brevity penalty.
bp = brevity_penalty(ref_lengths, hyp_lengths)
# Uniformly re-weighting based on maximum hypothesis lengths if largest
# order of n-grams < 4 and weights is set at default.
if auto_reweigh:
if hyp_lengths < 4 and weights == (0.25, 0.25, 0.25, 0.25):
weights = (1 / hyp_lengths,) * hyp_lengths
# Collects the various precision values for the different ngram orders.
p_n = [
Fraction(p_numerators[i], p_denominators[i], _normalize=False)
for i, _ in enumerate(weights, start=1)
]
# Returns 0 if there's no matching n-grams
# We only need to check for p_numerators[1] == 0, since if there's
# no unigrams, there won't be any higher order ngrams.
if p_numerators[1] == 0:
return 0
# If there's no smoothing, set use method0 from SmoothinFunction class.
if not smoothing_function:
smoothing_function = SmoothingFunction().method0
# Smoothen the modified precision.
# Note: smoothing_function() may convert values into floats;
# it tries to retain the Fraction object as much as the
# smoothing method allows.
p_n = smoothing_function(
p_n, references=references, hypothesis=hypothesis, hyp_len=hyp_lengths
)
s = (w_i * math.log(p_i) for w_i, p_i in zip(weights, p_n))
s = bp * math.exp(math.fsum(s))
return s
def modified_precision(references, hypothesis, n):
# Extracts all ngrams in hypothesis
# Set an empty Counter if hypothesis is empty.
counts = Counter(ngrams(hypothesis, n)) if len(hypothesis) >= n else Counter()
# Extract a union of references' counts.
# max_counts = reduce(or_, [Counter(ngrams(ref, n)) for ref in references])
max_counts = {}
for reference in references:
reference_counts = (
Counter(ngrams(reference, n)) if len(reference) >= n else Counter()
)
for ngram in counts:
max_counts[ngram] = max(max_counts.get(ngram, 0), reference_counts[ngram])
# Assigns the intersection between hypothesis and references' counts.
clipped_counts = {
ngram: min(count, max_counts[ngram]) for ngram, count in counts.items()
}
numerator = sum(list(clipped_counts.values())) ### list로 수정
# Ensures that denominator is minimum 1 to avoid ZeroDivisionError.
# Usually this happens when the ngram order is > len(reference).
denominator = max(1, sum(list(counts.values()))) ### list로 수정
return Fraction(int(numerator), int(denominator), _normalize=False) ### 둘 다 int로 수정
def closest_ref_length(references, hyp_len):
"""
This function finds the reference that is the closest length to the
hypothesis. The closest reference length is referred to as *r* variable
from the brevity penalty formula in Papineni et. al. (2002)
:param references: A list of reference translations.
:type references: list(list(str))
:param hyp_len: The length of the hypothesis.
:type hyp_len: int
:return: The length of the reference that's closest to the hypothesis.
:rtype: int
"""
ref_lens = (len(reference) for reference in references)
closest_ref_len = min(
ref_lens, key=lambda ref_len: (abs(ref_len - hyp_len), ref_len)
)
return closest_ref_len
def brevity_penalty(closest_ref_len, hyp_len):
if hyp_len > closest_ref_len:
return 1
# If hypothesis is empty, brevity penalty = 0 should result in BLEU = 0.0
elif hyp_len == 0:
return 0
else:
return math.exp(1 - closest_ref_len / hyp_len)
class SmoothingFunction:
"""
This is an implementation of the smoothing techniques
for segment-level BLEU scores that was presented in
Boxing Chen and Collin Cherry (2014) A Systematic Comparison of
Smoothing Techniques for Sentence-Level BLEU. In WMT14.
http://acl2014.org/acl2014/W14-33/pdf/W14-3346.pdf
"""
def __init__(self, epsilon=0.1, alpha=5, k=5):
"""
This will initialize the parameters required for the various smoothing
techniques, the default values are set to the numbers used in the
experiments from Chen and Cherry (2014).
:param epsilon: the epsilon value use in method 1
:type epsilon: float
:param alpha: the alpha value use in method 6
:type alpha: int
:param k: the k value use in method 4
:type k: int
"""
self.epsilon = epsilon
self.alpha = alpha
self.k = k
def method0(self, p_n, *args, **kwargs):
"""
No smoothing.
"""
p_n_new = []
for i, p_i in enumerate(p_n):
if p_i.numerator != 0:
p_n_new.append(p_i)
else:
_msg = str(
"\nThe hypothesis contains 0 counts of {}-gram overlaps.\n"
"Therefore the BLEU score evaluates to 0, independently of\n"
"how many N-gram overlaps of lower order it contains.\n"
"Consider using lower n-gram order or use "
"SmoothingFunction()"
).format(i + 1)
warnings.warn(_msg)
# When numerator==0 where denonminator==0 or !=0, the result
# for the precision score should be equal to 0 or undefined.
# Due to BLEU geometric mean computation in logarithm space,
# we we need to take the return sys.float_info.min such that
# math.log(sys.float_info.min) returns a 0 precision score.
p_n_new.append(sys.float_info.min)
return p_n_new
def method1(self, p_n, *args, **kwargs):
"""
Smoothing method 1: Add *epsilon* counts to precision with 0 counts.
"""
return [
(p_i.numerator + self.epsilon) / p_i.denominator
if p_i.numerator == 0
else p_i
for p_i in p_n
]
def method2(self, p_n, *args, **kwargs):
"""
Smoothing method 2: Add 1 to both numerator and denominator from
Chin-Yew Lin and Franz Josef Och (2004) ORANGE: a Method for
Evaluating Automatic Evaluation Metrics for Machine Translation.
In COLING 2004.
"""
return [
Fraction(p_n[i].numerator + 1, p_n[i].denominator + 1, _normalize=False)
if i != 0 else p_n[0]
for i in range(len(p_n))
]
def method3(self, p_n, *args, **kwargs):
"""
Smoothing method 3: NIST geometric sequence smoothing
The smoothing is computed by taking 1 / ( 2^k ), instead of 0, for each
precision score whose matching n-gram count is null.
k is 1 for the first 'n' value for which the n-gram match count is null/
For example, if the text contains:
- one 2-gram match
- and (consequently) two 1-gram matches
the n-gram count for each individual precision score would be:
- n=1 => prec_count = 2 (two unigrams)
- n=2 => prec_count = 1 (one bigram)
- n=3 => prec_count = 1/2 (no trigram, taking 'smoothed' value of 1 / ( 2^k ), with k=1)
- n=4 => prec_count = 1/4 (no fourgram, taking 'smoothed' value of 1 / ( 2^k ), with k=2)
"""
incvnt = 1 # From the mteval-v13a.pl, it's referred to as k.
for i, p_i in enumerate(p_n):
if p_i.numerator == 0:
p_n[i] = 1 / (2 ** incvnt * p_i.denominator)
incvnt += 1
return p_n
def method4(self, p_n, references, hypothesis, hyp_len=None, *args, **kwargs):
"""
Smoothing method 4:
Shorter translations may have inflated precision values due to having
smaller denominators; therefore, we give them proportionally
smaller smoothed counts. Instead of scaling to 1/(2^k), Chen and Cherry
suggests dividing by 1/ln(len(T)), where T is the length of the translation.
"""
incvnt = 1
hyp_len = hyp_len if hyp_len else len(hypothesis)
for i, p_i in enumerate(p_n):
if p_i.numerator == 0 and hyp_len > 1:
# incvnt = i + 1 * self.k / math.log(
# hyp_len
# ) # Note that this K is different from the K from NIST.
# p_n[i] = incvnt / p_i.denominator\
numerator = 1 / (2 ** incvnt * self.k / math.log(hyp_len))
p_n[i] = numerator / p_i.denominator
incvnt += 1
return p_n
def method5(self, p_n, references, hypothesis, hyp_len=None, *args, **kwargs):
"""
Smoothing method 5:
The matched counts for similar values of n should be similar. To a
calculate the n-gram matched count, it averages the n−1, n and n+1 gram
matched counts.
"""
hyp_len = hyp_len if hyp_len else len(hypothesis)
m = {}
# Requires an precision value for an addition ngram order.
p_n_plus1 = p_n + [modified_precision(references, hypothesis, 5)]
m[-1] = p_n[0] + 1
for i, p_i in enumerate(p_n):
p_n[i] = (m[i - 1] + p_i + p_n_plus1[i + 1]) / 3
m[i] = p_n[i]
return p_n
def method6(self, p_n, references, hypothesis, hyp_len=None, *args, **kwargs):
"""
Smoothing method 6:
Interpolates the maximum likelihood estimate of the precision *p_n* with
a prior estimate *pi0*. The prior is estimated by assuming that the ratio
between pn and pn−1 will be the same as that between pn−1 and pn−2; from
Gao and He (2013) Training MRF-Based Phrase Translation Models using
Gradient Ascent. In NAACL.
"""
hyp_len = hyp_len if hyp_len else len(hypothesis)
# This smoothing only works when p_1 and p_2 is non-zero.
# Raise an error with an appropriate message when the input is too short
# to use this smoothing technique.
assert p_n[2], "This smoothing method requires non-zero precision for bigrams."
for i, p_i in enumerate(p_n):
if i in [0, 1]: # Skips the first 2 orders of ngrams.
continue
else:
pi0 = 0 if p_n[i - 2] == 0 else p_n[i - 1] ** 2 / p_n[i - 2]
# No. of ngrams in translation that matches the reference.
m = p_i.numerator
# No. of ngrams in translation.
l = sum(1 for _ in ngrams(hypothesis, i + 1))
# Calculates the interpolated precision.
p_n[i] = (m + self.alpha * pi0) / (l + self.alpha)
return p_n
def method7(self, p_n, references, hypothesis, hyp_len=None, *args, **kwargs):
"""
Smoothing method 7:
Interpolates methods 4 and 5.
"""
hyp_len = hyp_len if hyp_len else len(hypothesis)
p_n = self.method4(p_n, references, hypothesis, hyp_len)
p_n = self.method5(p_n, references, hypothesis, hyp_len)
return p_n
def calc_BLEU(pred_sentence, GT_sentence, basepath = 'D:/21-ML data/codes/SLT_Transformer/bestmodel/'):
hypothesis = pred_sentence
references = GT_sentence
hyp_list = []
ref_list = []
for i in range(len(hypothesis)):
hyp_list.append(hypothesis[i].split()) # list(list(str))
ref_list.append([references[i].split()]) # list(list(list(str)))
#print(hyp_list, ref_list)
print("hypothesis, reference length = ", len(hypothesis), len(references))
return corpus_bleu(ref_list, hyp_list)
def main():
hypothesis = ['<SOS> wetter teilweise und luft bestimmt das wetter in deutschland']
references = ['kalte teilweise feuchte luft bestimmt das wetter in deutschland']
'''basepath = 'D:/21-ML data/codes/SLT_Transformer/bestmodel/'
with open(basepath+"TestPred_trial_4.txt", "r") as f:
for i in f.readlines():
hypothesis.append(i.split(" .")[0])
with open(basepath+"TestGT_trial_4.txt", "r") as f:
for i in f.readlines():
references.append(i.split(" .")[0])'''
hyp_list = []
ref_list = []
for i in range(len(hypothesis)):
hyp_list.append(hypothesis[i].split()) # list(list(str))
ref_list.append([references[i].split()]) # list(list(list(str)))
print("hypothesis, reference length = ", len(hypothesis), len(references))
print("BLEU-4 : ", corpus_bleu(ref_list, hyp_list))
if __name__ == "__main__":
main()