-
Notifications
You must be signed in to change notification settings - Fork 0
/
loader.py
104 lines (87 loc) · 2.99 KB
/
loader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import itertools
from torch.utils.data import ConcatDataset
from pathlib import Path
import numpy as np
from dataloader import ImageStickDataset
from traverse_data import iter_dir_for_traj_pths
def get_image_stick_dataset_single(
data_path,
time_skip=4,
time_offset=5,
time_trim=5,
img_size=224,
):
# add transforms for normalization and converting to float tensor
if type(data_path) == str:
data_path = Path(data_path)
val_mask = {"home": [], "env": [], "traj": []}
mask_texts = []
train_traj_paths, _, _ = iter_dir_for_traj_pths(data_path, val_mask, mask_texts)
train_dataset = ConcatDataset(
[
ImageStickDataset(
traj_path,
time_skip,
time_offset,
time_trim,
img_size,
pre_load=False,
transforms=None,
)
for traj_path in train_traj_paths
]
)
return train_dataset
def get_image_stick_dataset_double(
data_path,
time_skip=4,
time_offset=5,
time_trim=5,
img_size=224,
):
# add transforms for normalization and converting to float tensor
if type(data_path) == str:
data_path = Path(data_path)
val_mask = {"home": [], "env": [], "traj": []}
mask_texts = []
train_traj_paths, _, _ = iter_dir_for_traj_pths(data_path, val_mask, mask_texts)
train_dataset = ConcatDataset(
[
ImageStickDataset(
traj_path,
time_skip,
time_offset_n,
time_trim,
img_size,
pre_load=False,
transforms=None,
)
for traj_path, time_offset_n in itertools.product(
train_traj_paths, [time_offset, time_offset + 2]
)
]
)
return train_dataset
if __name__ == "__main__":
# the following code loads the dataset. In this we sample each expert demonstration twice.
# So if there are 10 demonstrations, we have 20 trajectories in the dataset.
# This done by sampling each demo by 2 different time offsets but similar time skips.
# If this is not desired, use get_image_stick_dataset_single instead.
dataset = get_image_stick_dataset_double(
"./CDS_Home",
time_skip=4, # number of frames to skip for each state in a trajectory
time_offset=5, # intial frames to crop out (to reduce noise)
time_trim=5, # final frames to crop out (to reduce noise)
img_size=None, # with None, images are loaded at original size. Can be set to 224 for resnet
)
array_of_trajectories = np.asarray([])
task = np.asarray(["door opening"])
for i in range(20):
dat = dataset[i]
trajectory = {
"action": dat[1],
"image": dat[0].numpy(),
"task": task,
}
array_of_trajectories = np.append(array_of_trajectories, trajectory)
np.save("./door_opening_data.npy", array_of_trajectories)