-
Notifications
You must be signed in to change notification settings - Fork 6
/
learn_from_data.py
133 lines (105 loc) · 4.72 KB
/
learn_from_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#This script reads a saved numpy array with features prepared for sklearn.
#The features are then used to learn something from the data.
__author__ = "Johannes Bjerva and Malvina Nissim"
__credits__ = ["Johannes Bjerva", "Malvina Nissim"]
__license__ = "GPL v3"
__version__ = "0.3 (31/08/2020)"
__maintainer__ = "Mike Zhang"
__email__ = "mikz@itu.dk"
__status__ = "Testing"
import argparse
import logging
import random
from collections import Counter
from typing import List, Tuple, Union
import numpy as np
import pandas as pd
from sklearn.metrics import (accuracy_score, classification_report,
confusion_matrix)
from sklearn.naive_bayes import MultinomialNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import LinearSVC
from sklearn.tree import DecisionTreeClassifier
# random.seed(1337)
logging.basicConfig(format='%(levelname)s %(message)s', level=logging.DEBUG)
def read_features(fname: str) -> Tuple[np.ndarray, np.ndarray]:
with open(fname, 'rb') as in_f:
loaded = np.load(in_f)
return loaded['X'], loaded['y']
def make_splits(X: np.ndarray,
y: np.ndarray,
args: argparse.Namespace) -> Tuple[List, List, List, List, List, List]:
X: list = list(X)
y: list = list(y)
train: float = float(args.split[0])/100.0
test: float = float(args.split[1])/100.0
combined = list(zip(X, y))
random.shuffle(combined)
X[:], y[:] = zip(*combined)
train_split = int(len(y) * train)
dev_split = int((len(y) * train) * test)
train_and_dev_X = X[:train_split]
train_and_dev_y = y[:train_split]
train_X, dev_X = train_and_dev_X[:dev_split], train_and_dev_X[dev_split:]
train_y, dev_y = train_and_dev_y[:dev_split], train_and_dev_y[dev_split:]
test_X = X[train_split:]
test_y = y[train_split:]
return train_and_dev_X, train_and_dev_y, train_X, train_y, dev_X, dev_y, test_X, test_y
def baseline(train_y: List[Union[int, str]], test_y: List[Union[int, str]]) -> None:
most_common = Counter(train_y).most_common()[0][0]
baseline = sum([1 for label in test_y if label == most_common]) / float(len(test_y))
logging.info(f'Most frequent label: {most_common}')
logging.info(f'Baseline accuracy: {baseline}')
def get_classifiers(args: argparse.Namespace) -> List[object]:
classifiers = []
if 'nb' in args.algorithms:
classifiers.append(MultinomialNB())
if 'dt' in args.algorithms:
classifiers.append(DecisionTreeClassifier(
random_state=0,
criterion='entropy',
min_samples_leaf=args.min_samples,
max_leaf_nodes=args.max_nodes))
if 'svm' in args.algorithms:
classifiers.append(LinearSVC(max_iter=500,random_state=0))
if 'knn' in args.algorithms:
classifiers.append(KNeighborsClassifier(n_neighbors=args.k))
return classifiers
def evaluate_classifier(clf: object,
test_X: List[Union[int, str]],
test_y: List[Union[int, str]],
args: argparse.Namespace) -> None:
preds = clf.predict(test_X)
accuracy = accuracy_score(preds, test_y)
if args.cm or args.plot:
show_confusion_matrix(test_y, preds, args)
return f'Accuracy: {accuracy}, classifier: {clf}'
def show_confusion_matrix(test_y, pred_y, args):
cm = confusion_matrix(test_y, pred_y, labels=sorted(list(set(test_y))))
if args.norm:
cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
np.set_printoptions(precision=2)
logging.debug('Showing Confusion Matrix')
if args.cm:
print(f'\n{pd.DataFrame(cm, index=sorted(list(set(test_y))), columns=sorted(list(set(test_y))))}\n')
if args.plot:
from plotting import plot_confusion_matrix # Import here due to potential matplotlib issues
plot_confusion_matrix(cm, test_y)
print(classification_report(test_y, pred_y, labels=sorted(list(set(test_y)))))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--npz', help='feature npz filename', type=str)
parser.add_argument('--algorithms', help='ml algorithms', nargs='+', required=True)
parser.add_argument('--plot', help='Show plot', action='store_true')
parser.add_argument('--cm', help='Show confusion matrix', action='store_true')
parser.add_argument('--norm', help='Normalise confusion matrix', action='store_true')
args = parser.parse_args()
X, y = read_features(args.npz)
train_X, train_y, dev_X, dev_y, test_X, test_y = make_splits(X, y, args)
baseline(train_y, test_y)
classifiers = get_classifiers(args)
for clf in classifiers:
clf.fit(train_X, train_y)
evaluate_classifier(clf, test_X, test_y, args)