-
Notifications
You must be signed in to change notification settings - Fork 0
/
data_utils.py
94 lines (79 loc) · 2.93 KB
/
data_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import dask.dataframe as dd
import dgl
import numpy as np
import scipy.sparse as ssp
import torch
import tqdm
# This is the train-test split method most of the recommender system papers running on MovieLens
# takes. It essentially follows the intuition of "training on the past and predict the future".
# One can also change the threshold to make validation and test set take larger proportions.
def train_test_split_by_time(df, timestamp, user):
df["train_mask"] = np.ones((len(df),), dtype=np.bool_)
df["val_mask"] = np.zeros((len(df),), dtype=np.bool_)
df["test_mask"] = np.zeros((len(df),), dtype=np.bool_)
df = dd.from_pandas(df, npartitions=10)
def train_test_split(df):
df = df.sort_values([timestamp])
if df.shape[0] > 1:
df.iloc[-1, -3] = False
df.iloc[-1, -1] = True
if df.shape[0] > 2:
df.iloc[-2, -3] = False
df.iloc[-2, -2] = True
return df
meta_df = {
"user_id": np.int64,
"movie_id": np.int64,
"rating": np.int64,
"timestamp": np.int64,
"user_id": np.int64,
"train_mask": bool,
"val_mask": bool,
"test_mask": bool,
}
df = (
df.groupby(user, group_keys=False)
.apply(train_test_split, meta=meta_df)
.compute(scheduler="processes")
.sort_index()
)
print(df[df[user] == df[user].unique()[0]].sort_values(timestamp))
return (
df["train_mask"].to_numpy().nonzero()[0],
df["val_mask"].to_numpy().nonzero()[0],
df["test_mask"].to_numpy().nonzero()[0],
)
def build_train_graph(g, train_indices, utype, itype, etype, etype_rev):
train_g = g.edge_subgraph(
{etype: train_indices, etype_rev: train_indices}, relabel_nodes=False
)
# copy features
for ntype in g.ntypes:
for col, data in g.nodes[ntype].data.items():
train_g.nodes[ntype].data[col] = data
for etype in g.etypes:
for col, data in g.edges[etype].data.items():
train_g.edges[etype].data[col] = data[
train_g.edges[etype].data[dgl.EID]
]
return train_g
def build_val_test_matrix(g, val_indices, test_indices, utype, itype, etype):
n_users = g.num_nodes(utype)
n_items = g.num_nodes(itype)
val_src, val_dst = g.find_edges(val_indices, etype=etype)
test_src, test_dst = g.find_edges(test_indices, etype=etype)
val_src = val_src.numpy()
val_dst = val_dst.numpy()
test_src = test_src.numpy()
test_dst = test_dst.numpy()
val_matrix = ssp.coo_matrix(
(np.ones_like(val_src), (val_src, val_dst)), (n_users, n_items)
)
test_matrix = ssp.coo_matrix(
(np.ones_like(test_src), (test_src, test_dst)), (n_users, n_items)
)
return val_matrix, test_matrix
def linear_normalize(values):
return (values - values.min(0, keepdims=True)) / (
values.max(0, keepdims=True) - values.min(0, keepdims=True)
)