Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hyperopt steps per epoch not being computed correctly #2175

Merged
28 changes: 14 additions & 14 deletions ludwig/data/dataset/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,12 @@ def __init__(
self.dataset_shard = dataset_shard
self.features = features
self.training_set_metadata = training_set_metadata
self.dataset_iter = dataset_shard.iter_datasets()
self.epoch_iter = dataset_shard.iter_epochs()

@contextlib.contextmanager
def initialize_batcher(self, batch_size=128, should_shuffle=True, seed=0, ignore_last=False, horovod=None):
yield RayDatasetBatcher(
self.dataset_iter,
self.epoch_iter,
self.features,
self.training_set_metadata,
batch_size,
Expand All @@ -171,7 +171,7 @@ def initialize_batcher(self, batch_size=128, should_shuffle=True, seed=0, ignore
@lru_cache(1)
def __len__(self):
# TODO(travis): find way to avoid calling this, as it's expensive
return next(self.dataset_iter).count()
return next(self.epoch_iter).count()

@property
def size(self):
Expand All @@ -181,7 +181,7 @@ def size(self):
class RayDatasetBatcher(Batcher):
def __init__(
self,
dataset_epoch_iterator: Iterator[ray.data.Dataset],
dataset_epoch_iterator: Iterator[DatasetPipeline],
features: Dict[str, Dict],
training_set_metadata: Dict[str, Any],
batch_size: int,
Expand Down Expand Up @@ -233,17 +233,17 @@ def steps_per_epoch(self):
return math.ceil(self.samples_per_epoch / self.batch_size)

def _fetch_next_epoch(self):
dataset = next(self.dataset_epoch_iterator)
pipeline = next(self.dataset_epoch_iterator)

read_parallelism = 1
if read_parallelism == 1:
self.dataset_batch_iter = self._create_async_reader(dataset)
self.dataset_batch_iter = self._create_async_reader(pipeline)
elif read_parallelism > 1:
# TODO: consider removing this. doesn't work currently and read performance seems generally
# very good with 1 parallelism
self.dataset_batch_iter = self._create_async_parallel_reader(dataset, read_parallelism)
self.dataset_batch_iter = self._create_async_parallel_reader(pipeline, read_parallelism)
else:
self.dataset_batch_iter = self._create_sync_reader(dataset)
self.dataset_batch_iter = self._create_sync_reader(pipeline)

self._step = 0
self._fetch_next_batch()
Expand Down Expand Up @@ -285,26 +285,26 @@ def _prepare_batch(self, batch: pd.DataFrame) -> Dict[str, np.ndarray]:

return res

def _create_sync_reader(self, dataset: ray.data.Dataset):
def _create_sync_reader(self, pipeline: DatasetPipeline):
to_tensors = self._to_tensors_fn()

def sync_read():
for batch in dataset.map_batches(to_tensors, batch_format="pandas").iter_batches(
for batch in pipeline.map_batches(to_tensors, batch_format="pandas").iter_batches(
prefetch_blocks=0, batch_size=self.batch_size, batch_format="pandas"
):
yield self._prepare_batch(batch)

return sync_read()

def _create_async_reader(self, dataset: ray.data.Dataset):
def _create_async_reader(self, pipeline: DatasetPipeline):
q = queue.Queue(maxsize=100)

batch_size = self.batch_size

to_tensors = self._to_tensors_fn()

def producer():
for batch in dataset.map_batches(to_tensors, batch_format="pandas").iter_batches(
for batch in pipeline.map_batches(to_tensors, batch_format="pandas").iter_batches(
prefetch_blocks=0, batch_size=batch_size, batch_format="pandas"
):
res = self._prepare_batch(batch)
Expand All @@ -323,13 +323,13 @@ def async_read():

return async_read()

def _create_async_parallel_reader(self, dataset: ray.data.Dataset, num_threads: int):
def _create_async_parallel_reader(self, pipeline: DatasetPipeline, num_threads: int):
q = queue.Queue(maxsize=100)

batch_size = self.batch_size

to_tensors = self._to_tensors_fn()
splits = dataset.split(n=num_threads)
splits = pipeline.split(n=num_threads)

def producer(i):
for batch in (
Expand Down