Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Allow unknown estimate of operator output bundles and ProgressBar totals #46601

Merged
merged 11 commits into from
Jul 17, 2024

Conversation

scottjlee
Copy link
Contributor

@scottjlee scottjlee commented Jul 12, 2024

Ray Data initially assumes that each read task produces exactly one block. Furthermore, one-to-one operators assume that the number of output blocks is the same as its upstream operator. Neither of these assumptions are always guaranteed to be accurate, which results in inaccurate progress bar estimations and can cause confusion. This PR updates PhysicalOperator.num_outputs_total() to allow for unknown estimated number of output bundles, which is the case when no tasks have finished, so it is not possible to provide a reasonable estimate.

For example, given the following reproducible script:

import time
import numpy as np
import ray
ray.init(num_cpus=1)

target_block_size = ray.data.DataContext.get_current().target_max_block_size

def sleep(batch):
    for _ in range(100):
        time.sleep(0.1)
        yield {"batch": np.zeros((target_block_size,), dtype=np.uint8)}

ray.data.range(10, override_num_blocks=10).map_batches(
    sleep, batch_size=None
).materialize()

We can compare the behavior before and after this PR (video links):

Why are these changes needed?

Related issue number

Closes #46420

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
@scottjlee scottjlee added the go add ONLY when ready to merge, run all tests label Jul 12, 2024
Signed-off-by: Scott Lee <sjl@anyscale.com>
@@ -273,7 +275,7 @@ def num_outputs_total(self) -> int:
return self._estimated_num_output_bundles
if len(self.input_dependencies) == 1:
return self.input_dependencies[0].num_outputs_total()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • should we remove the above if statement? this is the code logic that "assumes that each read task produces exactly one block"
  • also, I'm thinking if we should make this method abstract to force each subclass to have a reasonable implementation.

Copy link
Contributor Author

@scottjlee scottjlee Jul 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we remove the above if statement? this is the code logic that "assumes that each read task produces exactly one block"

Yeah, let me remove the if block, thanks.

In terms of forcing each operator to implement the method, I think for the majority of operators would simply return self._estimated_num_output_bundles if available, otherwise None. As a followup PR, I can refactor the individual logic to calculate self._estimated_num_output_bundles for operators into the num_outputs_total() method (for example, move this logic into num_outputs_total() method). Let me know if you'd rather I include it in this PR instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I included the necessary changes in this PR, turned out to be simpler than I originally thought. For the default behavior, I included in PhysicalOperator.num_outputs_total() which returns self._estimated_num_output_bundles. Operators like AllToAllOperator, Limit, Union, and Zip have their own implementation which adds some more logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks much nicer. Can you update OutputSplitter as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added logic to increment self._estimated_num_output_bundles in OutputSplitter._get_next_inner(). let me know if you have different logic in mind.

Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
if len(self.input_dependencies) == 1:
return self.input_dependencies[0].num_outputs_total()
raise AttributeError
return self._estimated_num_output_bundles
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment that subclasses should either override this method or update _estimated_num_output_bundles?

Copy link
Member

@bveeramani bveeramani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

Signed-off-by: Scott Lee <sjl@anyscale.com>
@anyscalesam anyscalesam added data Ray Data-related issues triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Jul 16, 2024
@@ -91,6 +91,9 @@ def has_next(self) -> bool:
def _get_next_inner(self) -> RefBundle:
output = self._output_queue.popleft()
self._metrics.on_output_dequeued(output)
if self._estimated_num_output_bundles is None:
self._estimated_num_output_bundles = 0
self._estimated_num_output_bundles += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to just inherit num_outputs_total from the input op.
because output splitter doesn't change the number of blocks.

Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
@scottjlee scottjlee requested a review from raulchen July 16, 2024 21:00
@raulchen raulchen merged commit abbc6f7 into ray-project:master Jul 17, 2024
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
data Ray Data-related issues go add ONLY when ready to merge, run all tests triage Needs triage (eg: priority, bug/not-bug, and owning component)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Data] Progress bar estimates are initially incorrect it read tasks yields multiple outputs
4 participants