Skip to content

Commit

Permalink
fixed indexing of samples in pytorch.
Browse files Browse the repository at this point in the history
  • Loading branch information
hariharan-devarajan committed Mar 10, 2023
1 parent 8521e96 commit 8fc40ce
Show file tree
Hide file tree
Showing 9 changed files with 24 additions and 19 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/python-package-conda.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ jobs:
run: |
touch __init__.py
export PYTHONPATH=./:$PYTHONPATH
RDMAV_FORK_SAFE=1 mpirun -np 2 python ./src/dlio_benchmark.py workload=unet3d ++workload.train.computation_time=0.05 ++workload.evaluation.eval_time=0.01 ++workload.train.epochs=2 ++workload.workflow.train=False ++workload.workflow.generate_data=True ++workload.dataset.num_files_train=32 ++workload.dataset.num_files_eval=32
RDMAV_FORK_SAFE=1 mpirun -np 2 python ./src/dlio_benchmark.py workload=unet3d ++workload.train.computation_time=0.05 ++workload.evaluation.eval_time=0.01 ++workload.train.epochs=2 ++workload.workflow.train=True ++workload.workflow.generate_data=False ++workload.dataset.num_files_train=32 ++workload.dataset.num_files_eval=32
RDMAV_FORK_SAFE=1 mpirun -np 2 python ./src/dlio_benchmark.py workload=unet3d ++workload.train.computation_time=0.05 ++workload.evaluation.eval_time=0.01 ++workload.train.epochs=2 ++workload.workflow.train=False ++workload.workflow.generate_data=True ++workload.dataset.num_files_train=16 ++workload.dataset.num_files_eval=16 ++workload.reader.read_threads=2 ++workload.dataset.record_length=4096 ++workload.dataset.record_length_stdev=0
RDMAV_FORK_SAFE=1 mpirun -np 2 python ./src/dlio_benchmark.py workload=unet3d ++workload.train.computation_time=0.05 ++workload.evaluation.eval_time=0.01 ++workload.train.epochs=2 ++workload.workflow.train=True ++workload.workflow.generate_data=False ++workload.dataset.num_files_train=16 ++workload.dataset.num_files_eval=16 ++workload.reader.read_threads=2 ++workload.dataset.record_length=4096 ++workload.dataset.record_length_stdev=0
- name: test-tf-loader-npz
run: |
touch __init__.py
export PYTHONPATH=./:$PYTHONPATH
RDMAV_FORK_SAFE=1 mpirun -np 2 python ./src/dlio_benchmark.py workload=unet3d ++workload.framework=tensorflow ++workload.data_reader.data_loader=tensorflow ++workload.train.computation_time=0.05 ++workload.evaluation.eval_time=0.01 ++workload.train.epochs=2 ++workload.workflow.train=False ++workload.workflow.generate_data=True ++workload.dataset.num_files_train=32 ++workload.dataset.num_files_eval=32
RDMAV_FORK_SAFE=1 mpirun -np 2 python ./src/dlio_benchmark.py workload=unet3d ++workload.framework=tensorflow ++workload.data_reader.data_loader=tensorflow ++workload.train.computation_time=0.05 ++workload.evaluation.eval_time=0.01 ++workload.train.epochs=2 ++workload.workflow.train=True ++workload.workflow.generate_data=False ++workload.dataset.num_files_train=32 ++workload.dataset.num_files_eval=32
RDMAV_FORK_SAFE=1 mpirun -np 2 python ./src/dlio_benchmark.py workload=unet3d ++workload.framework=tensorflow ++workload.data_reader.data_loader=tensorflow ++workload.train.computation_time=0.05 ++workload.evaluation.eval_time=0.01 ++workload.train.epochs=2 ++workload.workflow.train=False ++workload.workflow.generate_data=True ++workload.dataset.num_files_train=16 ++workload.dataset.num_files_eval=16 ++workload.reader.read_threads=2 ++workload.dataset.record_length=4096 ++workload.dataset.record_length_stdev=0
RDMAV_FORK_SAFE=1 mpirun -np 2 python ./src/dlio_benchmark.py workload=unet3d ++workload.framework=tensorflow ++workload.data_reader.data_loader=tensorflow ++workload.train.computation_time=0.05 ++workload.evaluation.eval_time=0.01 ++workload.train.epochs=2 ++workload.workflow.train=True ++workload.workflow.generate_data=False ++workload.dataset.num_files_train=16 ++workload.dataset.num_files_eval=16 ++workload.reader.read_threads=2 ++workload.dataset.record_length=4096 ++workload.dataset.record_length_stdev=0
1 change: 1 addition & 0 deletions src/data_loader/torch_data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def read(self, epoch_number):
self.epoch_number = epoch_number
do_shuffle = True if self.sample_shuffle != Shuffle.OFF else False
num_samples = self.num_samples * len(self._file_list)
num_samples = int(math.ceil(num_samples))
dataset = TorchDataset(self.format, self.dataset_type, epoch_number, num_samples, self.read_threads)
# TODO: In image segmentation, the distributed sampler is not used during eval, we could parametrize this away if needed
# This handles the partitioning between ranks
Expand Down
5 changes: 3 additions & 2 deletions src/reader/csv_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ def next(self):

@perftrace.event_logging
def read_index(self, index):
file_index = math.floor(index / self.num_samples)
relative_index = index - self.global_sample_start_index
file_index = int(math.floor(relative_index / self.num_samples)) % len(self._dataset)
element_index = index % self.num_samples
if self.read_type is ReadType.ON_DEMAND or self._dataset[file_index]["data"] is None:
self._dataset[file_index]['data'] = pd.read_csv(self._dataset[file_index]["file"], compression="infer").to_numpy()
Expand All @@ -124,4 +125,4 @@ def read_index(self, index):

@perftrace.event_logging
def get_sample_len(self):
return self.num_samples * len(self._local_file_list)
return self.samples_per_reader
5 changes: 3 additions & 2 deletions src/reader/hdf5_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ def next(self):

@perftrace.event_logging
def read_index(self, index):
file_index = math.floor(index / self.num_samples)
relative_index = index - self.global_sample_start_index
file_index = int(math.floor(relative_index / self.num_samples)) % len(self._dataset)
element_index = index % self.num_samples
if self.read_type is ReadType.ON_DEMAND or self._dataset[file_index]["data"] is None:
file_h5 = h5py.File(self._dataset[file_index]["file"], 'r')
Expand All @@ -131,4 +132,4 @@ def read_index(self, index):

@perftrace.event_logging
def get_sample_len(self):
return self.num_samples * len(self._local_file_list)
return self.samples_per_reader
2 changes: 1 addition & 1 deletion src/reader/jpeg_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,4 @@ def read_index(self, index):

@perftrace.event_logging
def get_sample_len(self):
return self.num_samples * len(self._local_file_list)
return self.samples_per_reader
6 changes: 4 additions & 2 deletions src/reader/npz_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ def next(self):

@perftrace.event_logging
def read_index(self, index):
file_index = math.floor(index / self.num_samples)
logging.info(f"{utcnow()} reading image {index} thread {self.thread_index} rank {self.my_rank}")
relative_index = index - self.global_sample_start_index
file_index = int(math.floor(relative_index / self.num_samples)) % len(self._dataset)
if self.read_type is ReadType.ON_DEMAND or self._dataset[file_index]["data"] is None:
with np.load(self._dataset[file_index]["file"], allow_pickle=True) as data:
self._dataset[file_index]['data'] = data["x"]
Expand All @@ -127,4 +129,4 @@ def read_index(self, index):

@perftrace.event_logging
def get_sample_len(self):
return self.num_samples * len(self._local_file_list)
return self.samples_per_reader
2 changes: 1 addition & 1 deletion src/reader/png_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,4 @@ def read_index(self, index):

@perftrace.event_logging
def get_sample_len(self):
return self.num_samples * len(self._local_file_list)
return self.samples_per_reader
12 changes: 6 additions & 6 deletions src/reader/reader_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,14 @@ def __init__(self, dataset_type, thread_index):
thread_rank = self.my_rank * self.read_threads + thread_index
thread_comm_size = self.comm_size * self.read_threads
self.samples_per_reader = int(math.ceil(total_samples / thread_comm_size))
global_sample_start_index = thread_rank * self.samples_per_reader
global_sample_end_index = (thread_rank + 1) * self.samples_per_reader - 1
self.global_sample_start_index = thread_rank * self.samples_per_reader
self.global_sample_end_index = (thread_rank + 1) * self.samples_per_reader - 1
else:
self.samples_per_reader = int(math.ceil(total_samples / self.comm_size))
global_sample_start_index = self.my_rank * self.samples_per_reader
global_sample_end_index = (self.my_rank + 1) * self.samples_per_reader - 1
self.file_start_index = int(math.floor(global_sample_start_index / self.num_samples))
self.file_end_index = int(math.floor(global_sample_end_index / self.num_samples))
self.global_sample_start_index = self.my_rank * self.samples_per_reader
self.global_sample_end_index = (self.my_rank + 1) * self.samples_per_reader - 1
self.file_start_index = int(math.floor(self.global_sample_start_index / self.num_samples))
self.file_end_index = int(math.floor(self.global_sample_end_index / self.num_samples))
self._local_file_list = self._file_list[self.file_start_index:self.file_end_index]

logging.info(f"{utcnow()} samples_per_reader {self.samples_per_reader} from files {self.file_start_index} to {self.file_end_index} for thread {self.thread_index} on rank {self.my_rank}")
Expand Down
2 changes: 1 addition & 1 deletion src/reader/tf_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,4 @@ def read_index(self, index):

@perftrace.event_logging
def get_sample_len(self):
return self.num_samples * len(self._local_file_list)
return self.samples_per_reader

0 comments on commit 8fc40ce

Please sign in to comment.