-
Notifications
You must be signed in to change notification settings - Fork 6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[data] New executor backend [3/n]--- Add basic operators impl #31305
Conversation
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
assert _take_outputs(op) == [[i] for i in range(10)] | ||
|
||
|
||
def test_map_operator_ray_args(shutdown_only): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Debating whether it's worth it to mock out the Ray API here to speed up these tests a bit. Maybe it's not that important since the bulk of the testing will be for StreamingExecutor, which we can write separate mocks for.
python/ray/data/_internal/execution/operators/input_data_buffer.py
Outdated
Show resolved
Hide resolved
|
||
Supported strategies: {TaskPoolStrategy, ActorPoolStrategy}. | ||
""" | ||
return self._strategy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I wonder if we can keep the implementation details of compute strategy and ray remote args etc outside of the operators? It could be cleaner if we pass in the ray.remote Callable instead of the worker's Callable as the transform_fn
but not sure if this will work so I'll leave it up to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I have a TODO on line 78 to clean this up in the future. I'm hoping the ComputeStrategy can turn into a simple dataclass once we migrate fully to the new backend. Right now, I avoided doing this refactoring to keep the changes self-contained.
About the callable, I think that's possible but it's probably also easier to do once we have the logical optimization layer in place (the optimizer could generate the ray.remote callable).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great!
I'll hold this open until EOD for more comments. |
python/ray/data/_internal/execution/operators/map_operator_tasks_impl.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/map_operator_tasks_impl.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/map_operator_tasks_impl.py
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/map_operator_tasks_impl.py
Show resolved
Hide resolved
input_op: Operator generating input data for this op. | ||
name: The name of this operator. | ||
compute_strategy: Customize the compute strategy for this op. | ||
min_rows_per_batch: The number of rows to gather per batch passed to the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we name it as min_rows_per_fn_call
? batch
is kind of confusing here, as this is neither user-facing batch in map_batches
, nor zero-copy batch execution we shall introduce later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Batch seems clearer to me: it basically is the same as the user facing batch size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should keep using "target_row_per_batch", since there is not guarantee for "min" here. And we should clarify it's possible the target is not met when not enough rows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, the previous naming was very confusing for me. The new one is clear in intent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ericl But that intent is incorrect: this is a target to get near to, not a minimum/floor. We add blocks to a bundle up to this target size, but we purposefully do not exceed it, so this is definitely not a minimum.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, let me rename this to min_rows_per_bundle
then. I don't think it's possible to be precisely unambiguous, and would prefer we keep the "min" intent which is the big picture.
input_op: Operator generating input data for this op. | ||
name: The name of this operator. | ||
compute_strategy: Customize the compute strategy for this op. | ||
min_rows_per_batch: The number of rows to gather per batch passed to the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should keep using "target_row_per_batch", since there is not guarantee for "min" here. And we should clarify it's possible the target is not met when not enough rows.
Signed-off-by: Eric Liang <ekhliang@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly nits, the only potential blocker in my mind is the question around the block bundling logic: it appears to be dropping empty blocks, which I don't think is the current Datasets behavior.
python/ray/data/_internal/execution/operators/map_operator_tasks_impl.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/map_operator_tasks_impl.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/map_operator_tasks_impl.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/map_operator_tasks_impl.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/map_operator_tasks_impl.py
Show resolved
Hide resolved
Co-authored-by: Clark Zinzow <clarkzinzow@gmail.com> Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
self._obj_store_mem_peak: int = 0 | ||
|
||
def add_input(self, bundle: RefBundle) -> None: | ||
if self._min_rows_per_bundle is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended up putting this back, in order to enable empty block propagation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated; main changes was I removed the circular dependency between the operator impl and the wrapper operator.
Add the initial operator implementations. This is split out from #30903
…oject#31305) Add the initial operator implementations. This is split out from ray-project#30903 Signed-off-by: tmynn <hovhannes.tamoyan@gmail.com>
Why are these changes needed?
Add the initial operator implementations.
This is split out from #30903
TODO: