Skip to content

Commit

Permalink
Merge pull request #63 from zhenghh04/main
Browse files Browse the repository at this point in the history
Adding support for training on a subset of dataset
  • Loading branch information
zhenghh04 authored May 5, 2023
2 parents 88613b5 + c6a51fe commit c9fbbe6
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 11 deletions.
15 changes: 10 additions & 5 deletions src/data_generator/data_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from shutil import copyfile
import numpy as np
import logging
from src.utils.utility import utcnow
from src.utils.utility import utcnow, add_padding


class DataGenerator(ABC):
Expand Down Expand Up @@ -82,21 +82,26 @@ def generate(self):
if self.num_files_eval > 0:
self.total_files_to_generate += self.num_files_eval
self._file_list = []
nd_f_train = len(str(self.num_files_train))
nd_f_eval = len(str(self.num_files_eval))
nd_sf_train = len(str(self.num_subfolders_train))
nd_sf_eval = len(str(self.num_subfolders_eval))

if self.num_subfolders_train > 1:
ns = np.ceil(self.num_files_train / self.num_subfolders_train)
for i in range(self.num_files_train):
file_spec = "{}/train/{}/{}_{}_of_{}.{}".format(self.data_dir, int(i//ns), self.file_prefix, i, self.num_files_train, self.format)
file_spec = "{}/train/{}/{}_{}_of_{}.{}".format(self.data_dir, add_padding(i%self.num_subfolders_train, nd_sf_train), self.file_prefix, add_padding(i, nd_f_train), self.num_files_train, self.format)
self._file_list.append(file_spec)
else:
for i in range(self.num_files_train):
file_spec = "{}/train/{}_{}_of_{}.{}".format(self.data_dir, self.file_prefix, i, self.num_files_train, self.format)
file_spec = "{}/train/{}_{}_of_{}.{}".format(self.data_dir, self.file_prefix, add_padding(i, nd_f_train), self.num_files_train, self.format)
self._file_list.append(file_spec)
if self.num_subfolders_eval > 1:
ns = np.ceil(self.num_files_eval / self.num_subfolders_eval)
for i in range(self.num_files_eval):
file_spec = "{}/valid/{}/{}_{}_of_{}.{}".format(self.data_dir, int(i//ns), self.file_prefix, i, self.num_files_eval, self.format)
file_spec = "{}/valid/{}/{}_{}_of_{}.{}".format(self.data_dir, add_padding(i%self.num_subfolders_eval, nd_sf_eval), self.file_prefix, add_padding(i, nd_f_eval), self.num_files_eval, self.format)
self._file_list.append(file_spec)
else:
for i in range(self.num_files_eval):
file_spec = "{}/valid/{}_{}_of_{}.{}".format(self.data_dir, self.file_prefix, i, self.num_files_eval, self.format)
file_spec = "{}/valid/{}_{}_of_{}.{}".format(self.data_dir, self.file_prefix, add_padding(i, nd_f_eval), self.num_files_eval, self.format)
self._file_list.append(file_spec)
18 changes: 13 additions & 5 deletions src/dlio_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ def __init__(self, cfg):

self.data_generator = None
self.num_files_train = self.args.num_files_train
self.num_subfolders_train = self.args.num_subfolders_train
self.num_subfolders_eval = self.args.num_subfolders_eval
self.num_samples = self.args.num_samples_per_file
self.total_training_steps = self.args.total_training_steps

Expand Down Expand Up @@ -187,13 +189,19 @@ def initialize(self):
if self.storage.get_node(
os.path.join(self.args.data_folder, f"{dataset_type}",
filenames[0])) == MetadataType.DIRECTORY:
fullpaths = self.storage.walk_node(os.path.join(self.args.data_folder, f"{dataset_type}/*/*"),
if dataset_type==DatasetType.TRAIN:
assert(self.num_subfolders_train == len(filenames))
elif dataset_type==DatasetType.VALID:
assert(self.num_subfolders_eval == len(filenames))
fullpaths = self.storage.walk_node(os.path.join(self.args.data_folder, f"{dataset_type}/*/*.{self.args.format}"),
use_pattern=True)
files = [self.storage.get_basename(f) for f in fullpaths]
idx = np.argsort(files)
fullpaths = [fullpaths[i] for i in idx]
else:
fullpaths = [self.storage.get_uri(os.path.join(self.args.data_folder, f"{dataset_type}", entry))
for entry
in filenames]
fullpaths = [f for f in fullpaths if f.find(f'{self.args.format}')!=-1]
for entry in filenames if entry.find(f'{self.args.format}')!=-1]
fullpaths = sorted(fullpaths)
if dataset_type is DatasetType.TRAIN:
file_list_train = fullpaths
elif dataset_type is DatasetType.VALID:
Expand All @@ -207,7 +215,7 @@ def initialize(self):
file_list_train = file_list_train[:self.num_files_train]
if (self.num_files_eval < len(file_list_eval)):
logging.warning(f"Number of files for evaluation in {os.path.join(self.args.data_folder, f'{DatasetType.VALID}')} ({len(file_list_eval)}) is more than requested ({self.num_files_eval}). A subset of files will be used ")
file_list_eval = file_list_train[:self.num_files_eval]
file_list_eval = file_list_eval[:self.num_files_eval]
self.args.derive_configurations(file_list_train, file_list_eval)
self.args.validate()
self.framework.barrier()
Expand Down
3 changes: 3 additions & 0 deletions src/storage/file_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,6 @@ def get_data(self, id, data, offset=None, length=None):
with open(self.get_uri(id), "r") as fd:
data = fd.read()
return data

def get_basename(self, id):
return os.path.basename(id)
3 changes: 3 additions & 0 deletions src/storage/s3_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,6 @@ def put_data(self, id, data, offset=None, length=None):
@dlp.log
def get_data(self, id, data, offset=None, length=None):
return super().get_data(self.get_uri(id), data, offset, length)

def get_basename(self, id):
return os.path.basename(id)
4 changes: 3 additions & 1 deletion src/utils/statscounter.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ def __init__(self):
self.summary['num_accelerators'] = self.comm_size
self.summary['hostname'] = socket.gethostname()
self.summary['metric'] = {}

self.summary['num_files_train'] = self.args.num_files_train
self.summary['num_files_eval'] = self.args.num_files_eval
self.summary['num_samples_per_file'] = self.args.num_samples_per_file
max_steps = math.floor(self.args.num_samples_per_file * self.args.num_files_train / self.args.batch_size / self.args.comm_size)

if self.args.total_training_steps > 0:
Expand Down
8 changes: 8 additions & 0 deletions src/utils/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@
LOG_TS_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"
from mpi4py import MPI

def add_padding(n, num_digits=None):
str_out = str(n)
if num_digits!=None:
return str_out.rjust(num_digits, "0")
else:
return str_out



def utcnow(format=LOG_TS_FORMAT):
return datetime.now().strftime(format)
Expand Down

0 comments on commit c9fbbe6

Please sign in to comment.