-
Notifications
You must be signed in to change notification settings - Fork 5
/
evaluate.py
145 lines (101 loc) · 4.76 KB
/
evaluate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# the script will evaluate partial matches without producing any ouput file
"""Evaluate the model"""
import os
import torch
import utils
import random
import logging
import argparse
import numpy as np
from data_loader import DataLoader
from SequenceTagger import BertForSequenceTagging
from metrics import f1_score, get_entities, classification_report, accuracy_score
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
parser = argparse.ArgumentParser()
parser.add_argument('--dataset', default='proto', help="Directory containing the dataset")
parser.add_argument('--seed', type=int, default=23, help="random seed for initialization")
def evaluate(model, data_iterator, params, mark='Eval', verbose=False):
"""Evaluate the model on `steps` batches."""
# set model to evaluation mode
model.eval()
idx2tag = params.idx2tag
true_tags = []
pred_tags = []
# a running average object for loss
loss_avg = utils.RunningAverage()
for _ in range(params.eval_steps):
# fetch the next evaluation batch
batch_data, batch_token_starts, batch_tags = next(data_iterator)
batch_masks = batch_data.gt(0)
loss = model((batch_data, batch_token_starts), token_type_ids=None, attention_mask=batch_masks, labels=batch_tags)[0]
loss_avg.update(loss.item())
batch_output = model((batch_data, batch_token_starts), token_type_ids=None, attention_mask=batch_masks)[0] # shape: (batch_size, max_len, num_labels)
batch_output = batch_output.detach().cpu().numpy()
batch_tags = batch_tags.to('cpu').numpy()
pred_tags.extend([[idx2tag.get(idx) for idx in indices] for indices in np.argmax(batch_output, axis=2)])
true_tags.extend([[idx2tag.get(idx) if idx != -1 else 'O' for idx in indices] for indices in batch_tags])
assert len(pred_tags) == len(true_tags)
# logging loss, f1 and report
metrics = {}
f1 = f1_score(true_tags, pred_tags)
metrics['loss'] = loss_avg()
metrics['f1'] = f1
metrics_str = "; ".join("{}: {:05.2f}".format(k, v) for k, v in metrics.items())
logging.info("- {} metrics: ".format(mark) + metrics_str)
if verbose:
report_acc = accuracy_score(true_tags, pred_tags)
report = classification_report(true_tags, pred_tags)
logging.info(report_acc)
logging.info(report)
return metrics
def interAct(model, data_iterator, params, mark='Interactive', verbose=False):
"""Evaluate the model on `steps` batches."""
# set model to evaluation mode
model.eval()
idx2tag = params.idx2tag
true_tags = []
pred_tags = []
# a running average object for loss
loss_avg = utils.RunningAverage()
batch_data, batch_token_starts = next(data_iterator)
batch_masks = batch_data.gt(0)
batch_output = model((batch_data, batch_token_starts), token_type_ids=None, attention_mask=batch_masks)[0] # shape: (batch_size, max_len, num_labels)
batch_output = batch_output.detach().cpu().numpy()
pred_tags.extend([[idx2tag.get(idx) for idx in indices] for indices in np.argmax(batch_output, axis=2)])
return(get_entities(pred_tags))
if __name__ == '__main__':
args = parser.parse_args()
tagger_model_dir = 'experiments/' + args.dataset
# Load the parameters from json file
json_path = os.path.join(tagger_model_dir, 'params.json')
assert os.path.isfile(json_path), "No json configuration file found at {}".format(json_path)
params = utils.Params(json_path)
# Use GPUs if available
params.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Set the random seed for reproducible experiments
random.seed(args.seed)
torch.manual_seed(args.seed)
params.seed = args.seed
# Set the logger
utils.set_logger(os.path.join(tagger_model_dir, 'evaluate.log'))
# Create the input data pipeline
logging.info("Loading the dataset...")
# Initialize the DataLoader
data_dir = 'data/' + args.dataset
if args.dataset in ["proto"]:
bert_class = 'dmis-lab/biobert-v1.1' # auto
elif args.dataset in ["msra"]:
bert_class = 'dmis-lab/biobert-v1.1' # auto
data_loader = DataLoader(data_dir, bert_class, params, token_pad_idx=0, tag_pad_idx=-1)
# Load the model
model = BertForSequenceTagging.from_pretrained(tagger_model_dir)
model.to(params.device)
# Load data
test_data = data_loader.load_data('test')
# Specify the test set size
params.test_size = test_data['size']
params.eval_steps = params.test_size // params.batch_size
test_data_iterator = data_loader.data_iterator(test_data, shuffle=False)
logging.info("- done.")
logging.info("Starting evaluation...")
test_metrics = evaluate(model, test_data_iterator, params, mark='Test', verbose=True)