-
Notifications
You must be signed in to change notification settings - Fork 7.1k
[data] drain buffer on finalize for block ref bundler #59019
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[data] drain buffer on finalize for block ref bundler #59019
Conversation
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
| output_buffer_size += bundle_size | ||
| else: | ||
| remainder = self._bundle_buffer[idx:] | ||
| break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If i remove the break statement, the test fails below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request addresses a bug in BlockRefBundler where bundles could be silently dropped during finalization. The fix involves adding a break statement in get_next_bundle to correctly handle bundle remainders. A new test case, test_block_ref_bundler_finalize_drains_all, has been added to verify that all buffered data is drained correctly upon finalization, preventing regressions. The changes are correct and the added test is comprehensive for the scenario it covers. The pull request is well-structured and effectively resolves the identified issue.
| self._block_ref_bundler.add_bundle(refs) | ||
| self._metrics.on_input_queued(refs) | ||
|
|
||
| if self._block_ref_bundler.has_bundle(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should also so use while here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to change it (in this PR at least), because we would launch 1 task per available bundle. So this can change the behavior of execution
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@owenowenisme i'm gonna to solve this issue in this big PR #59093
Description
Context: #58694 (comment)
PR that changed this behavior: https://github.com/ray-project/ray/pull/52806/files
Before the PR, the
BlockRefBundlercan silently drop bundles onfinalize, but due to the way we use it*, it is not possible. However, I still think we should fix it to make it more explicit. I added a break statement when the bundle target size is met.*Context: The current behavior of
TaskPoolMapOperatorandActorPoolMapOperatoris that a bundle is queued and will eagerly try to launch a task with the bundled input. The bundled input will always contain all the existing bundles inBlockRefBundlerdue to the current behavior above(you can think of it as theBlockRefBundlerdoesn't have time to store a backlog of bundles, because once it has a ready bundle it is launched). SO there are never remainders (see code) remaining, and hence it never reaches the else statement on line map_operator.py:750 inBlockRefBundler.Related issues
Additional information