-
Notifications
You must be signed in to change notification settings - Fork 597
/
data_frame.py
104 lines (80 loc) · 3.56 KB
/
data_frame.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import copy
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
class DataFrame(object):
"""Minimal pd.DataFrame analog for handling n-dimensional numpy matrices with additional
support for shuffling, batching, and train/test splitting.
Args:
columns: List of names corresponding to the matrices in data.
data: List of n-dimensional data matrices ordered in correspondence with columns.
All matrices must have the same leading dimension. Data can also be fed a list of
instances of np.memmap, in which case RAM usage can be limited to the size of a
single batch.
"""
def __init__(self, columns, data):
assert len(columns) == len(data), 'columns length does not match data length'
lengths = [mat.shape[0] for mat in data]
assert len(set(lengths)) == 1, 'all matrices in data must have same first dimension'
self.length = lengths[0]
self.columns = columns
self.data = data
self.dict = dict(zip(self.columns, self.data))
self.idx = np.arange(self.length)
def shapes(self):
return pd.Series(dict(zip(self.columns, [mat.shape for mat in self.data])))
def dtypes(self):
return pd.Series(dict(zip(self.columns, [mat.dtype for mat in self.data])))
def shuffle(self):
np.random.shuffle(self.idx)
def train_test_split(self, train_size, random_state=np.random.randint(1000), stratify=None):
train_idx, test_idx = train_test_split(
self.idx,
train_size=train_size,
random_state=random_state,
stratify=stratify
)
train_df = DataFrame(copy.copy(self.columns), [mat[train_idx] for mat in self.data])
test_df = DataFrame(copy.copy(self.columns), [mat[test_idx] for mat in self.data])
return train_df, test_df
def batch_generator(self, batch_size, shuffle=True, num_epochs=10000, allow_smaller_final_batch=False):
epoch_num = 0
while epoch_num < num_epochs:
if shuffle:
self.shuffle()
for i in range(0, self.length + 1, batch_size):
batch_idx = self.idx[i: i + batch_size]
if not allow_smaller_final_batch and len(batch_idx) != batch_size:
break
yield DataFrame(
columns=copy.copy(self.columns),
data=[mat[batch_idx].copy() for mat in self.data]
)
epoch_num += 1
def iterrows(self):
for i in self.idx:
yield self[i]
def mask(self, mask):
return DataFrame(copy.copy(self.columns), [mat[mask] for mat in self.data])
def concat(self, other_df):
mats = []
for column in self.columns:
mats.append(np.concatenate([self[column], other_df[column]], axis=0))
return DataFrame(copy.copy(self.columns), mats)
def items(self):
return self.dict.items()
def __iter__(self):
return self.dict.items().__iter__()
def __len__(self):
return self.length
def __getitem__(self, key):
if isinstance(key, str):
return self.dict[key]
elif isinstance(key, int):
return pd.Series(dict(zip(self.columns, [mat[self.idx[key]] for mat in self.data])))
def __setitem__(self, key, value):
assert value.shape[0] == len(self), 'matrix first dimension does not match'
if key not in self.columns:
self.columns.append(key)
self.data.append(value)
self.dict[key] = value