-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Hyperopt steps per epoch not being computed correctly #2175
Hyperopt steps per epoch not being computed correctly #2175
Conversation
ludwig/backend/ray.py
Outdated
@@ -259,8 +267,11 @@ def tune_learning_rate_fn( | |||
initialize_pytorch(horovod=hvd) | |||
|
|||
pipe = dataset.pipeline(shuffle=False, **data_loader_kwargs) | |||
|
|||
# Expensive blocking call |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this because of the new len(dataset)
, or was this always an expensive blocking call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
len(dataset)
does a full iteration over the dataset (as it does not know the size apriori).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@justinxzhao It was always an expensive blocking call. I think calling the count()
method on a Ray dataset or dataset pipeline is blocking and forces a full iteration over the dataset like Travis mentioned
ludwig/backend/ray.py
Outdated
@@ -160,6 +163,7 @@ def train_fn( | |||
|
|||
train_shard = RayDatasetShard( | |||
rt.get_dataset_shard("train"), | |||
train_dataset_size, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the dataset size before being split among the workers. Have you tested this with multiple training workers?
I believe we need to also divide this by the number of workers, plus account for rounding (see Ray Datasets implementation of split()
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tgaddair This would just be setting
backend:
trainer:
num_workers: 2
or
backend:
trainer:
num_workers: 4
right? If yes, then this still works as intended with this fix.
I think the reason multiple workers isn't an issue is because we call the ray.data.Dataset.split()
method on the DatasetPipeline objects, splitting the pipeline amongst multiple workers, rather than the dataset itself. So as long as we use the size of the dataset directly, we should be okay.
Would love it if someone can double check this understanding. @ShreyaR are you able to pull this branch and see if these changes fix the problem you were seeing before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose what I don't understand is that the number of steps per epoch is dependent on the number of workers. So if you have 100 batches and 4 workers, there should be 25 steps per epoch. So how do we account for this in this approach?
In the previous implementation here, you can see that we're taking the length of the dataset pipeline after it has been split, which accounts for the number of workers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tgaddair Gotcha, that makes sense. I'm able to reproduce what you said and can see that this implementation doesn't account for the number of workers and rounding. Will look into this again and create a fix that is in line with what you described.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it would be possible to window after splitting to work around this? Seems like it should be the same effect, more or less.
ludwig/backend/ray.py
Outdated
@@ -259,8 +267,11 @@ def tune_learning_rate_fn( | |||
initialize_pytorch(horovod=hvd) | |||
|
|||
pipe = dataset.pipeline(shuffle=False, **data_loader_kwargs) | |||
|
|||
# Expensive blocking call |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
len(dataset)
does a full iteration over the dataset (as it does not know the size apriori).
I spent some more time investigating this issue today. I've been using the 100MB dataset. A few things I noticed:
To account for these observations, my latest push keeps track of whether windowing is being used and the number of partitions/blocks in the dataset within the RayDataset class. This is then factored when calculating the size of each RayDatasetShard. This seems to fix the issue for Will continue to look into why this is happening |
@arnavgarg1 not sure why the evaluation epochs would get messed up. A few things to try out:
|
This update to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if tracking the total number of partitions and calculating the size of the dataset by multiplying by the total number of partitions is the way to go.
I like @tgaddair's suggestion of doing pipelining on each shard instead of on the entire dataset. Here's some code changes you can make to achieve that:
You can update RayTrainerV2's train function to pass in RayDataset
instead of DatasetPipeline
objects.
dataset = {"train": training_set}
if validation_set is not None:
dataset["val"] = validation_set
if test_set is not None:
dataset["test"] = test_set
You will also need to update RayDatasetShard so that it expects a RayDataset
instead of a DatasetPipeline
.
class RayDatasetShard(Dataset):
def __init__(...):
self.dataset_iter = dataset_shard.pipeline(...).iter_datasets()
@lru_cache(1)
def __len__(self):
# TODO(travis): find way to avoid calling this, as it's expensive
return self.dataset_shard.count()
You may need to make some other changes and do some general bookkeeping, but that's the broad idea. Let me know if this makes sense.
ludwig/data/dataset/ray.py
Outdated
return next(self.dataset_iter).count() | ||
next_iteration_length = next(self.dataset_iter).count() | ||
if self.num_dataset_partitions > 1 and self.window_status: | ||
return next_iteration_length * self.num_dataset_partitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concern with this solution is that this might not be accurate -- two partitions may not have the same number of examples.
@ShreyaR This makes a lot of sense and also is very clean, thank you for your help! I will update this today and see if it fixes the issue that we were seeing |
…tly' of https://github.com/ludwig-ai/ludwig into 2144-hyperopt-steps-per-epoch-not-being-computed-correctly
This PR has been updated now to correctly read batches from iter_epochs. I've tested it with multiple workers and multiple window sizes and it works as intended. Should be good to go - thanks for all your help @ShreyaR @tgaddair @justinxzhao |
The biggest change is to pull batches from the epoch iterator which creates a pipeline over the windows. Prior to this, we were pulling in batches from the windowed dataset directly which caused fewer steps per epoch since the dataset was a subset of the overall dataset. |
* getting count of dataset instead of window * read length from dataset instead of pipeline * Removing older commented code * Refactor * Removing dead code * Modify RayDatasetShard length to factor in windowing and dataset partitions * Moving to iter_epochs() * Working fix
Following up on this issue, when using windowing in the backend over a partitioned dataset (> 1 partition), we observed that the
steps_per_epoch
was being under-calculated. This results in each epoch only comprising of a fraction of the dataset.This PR modifies the
RayDatasetShard
class to use size of theRayDataset
to calculate the number of epochs instead of the size of theDatasetPipeline
to calculate the number of epochs, bringing back the number of epochs to what we'd expect.Validation that this change works:
Before:
train_cli()
with the dataset resulted in:Now
train_cli()
with the dataset results in