-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add write operator in new logical plan #32440
Conversation
Signed-off-by: jianoaix <iamjianxiao@gmail.com>
super().__init__( | ||
"Write", | ||
input_op, | ||
fn=lambda x: x, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is kind of awkward that we cannot support write operator. Could we mark fn
as optional in AbstractMap
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't fn be the write function for the data here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like fn=datasource.write
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fn
is the UDF, and datasrouce.write is the transform_fn which is internal.
Maybe we should rename user-supplied fn
to udf
for clarity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I think you actually want a hierarchy of AbstractMap>BatchMap and AbstractMap>Write then? Both are generating maps, just with different strategies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want a logical-level base operator for all map-like operators, then we should probably introduce a new base AbstractMap
, with the existing AbstractMap
renamed to AbstractUDFMap
:
AbstractMap(ray_remote_args)
- Read
- Write
- AbstractUDFMap(fn, compute)
- MapBatches
- MapRows
- Filter
- FlatMap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uplift the AbstractMap
looks good to me and it seems correspond to the MapOperator
in the physical operator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll leave it to you to decide, since I don't have much context on the fusion/planner code yet.
The only point I want to make is I don't think this class really needs to exist, except as an alias: Write ~= MapBatches(batch_size=None, fn=datasource.write_fn, ray_remote_args={}))
. But this may just be a non-useful shortcut.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel quite strongly that that's a decision for the planner to make, there's limited utility of making that translation/assertion at the logical level via logical operation abstraction hierarchies, and we should only create abstraction hierarchies at the logical operator level if it's useful for the optimization rules or the planner. That shortcut only becomes useful at the planning level, when we're creating the physical MapOperator
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw let's check if the write operator breaks the current randomize_blocks_order reordering.
@@ -518,6 +518,12 @@ def test_read_map_chain_operator_fusion_e2e(ray_start_regular_shared, enable_opt | |||
assert name in ds.stats() | |||
|
|||
|
|||
def test_write_operator(ray_start_regular_shared, enable_optimizer, tmp_path): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we also add a unit test for Write
operator similar to others - such as test_sort_operator
below.
@@ -119,6 +120,26 @@ def __init__( | |||
) | |||
|
|||
|
|||
class Write(AbstractMap): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@c21 @jianoaix Should Write
derive from AbstractMap
, or should it be a standalone logical operator like Read
? https://github.com/ray-project/ray/blob/2248ea602fbf6c53db1c5afc58f8bd386a66e1de/python/ray/data/_internal/logical/operators/read_operator.py
Then we could avoid this awkwardness with fn
and the like. I don't see a compelling reason to have Write
derive from AbstractMap
, on first pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@clarkzinzow - I think if we make Write
not AbstractMap
, we need to add some extra code during planning (current code).
Would it break anything for operator fusion and zero-copy batching? If not, I am also in favor to make Write
extend LogicalOperator
directly, because Read
is doing that, and we can add as a seprate plan_write_op
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@c21 Yeah I think it's worth breaking out the separate planning bit, and it shouldn't break anything with operator fusion or zero-copy batching; it will actually line up better with the latter!
I have this bit refactored to make the dispatch easier to centralize beyond just the AbstractMap
ops: https://github.com/ray-project/ray/pull/32178/files#diff-4caa8ddd8103dd8f8d6a3e8c1237aec4eaa168a81dc914ae83b4f6042d68a1da
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool then +1 to make Write
extend LogicalOperator
directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I was feeling the fn
was weird to have since it's not relevant for Write
.
I had local changes not pushed here. I'll make a followup. |
With the new `write` added (from #32015 and #32440), Ray Data intends to support both the `write` and `do_write` functions for now. The check currently uses the `hasattr()` function to ensure the datasource object has a `write` method before using it. However, this is insufficient for a custom datasource that inherits from `Datasource` since `Datasource` has the `write` method implemented. If the custom datasource only has `do_write` implemented, `hasattr(datasource, "write")` will return True since `hasattr()` will detect methods via inheritance. The solution is to check if the `write` method was overwritten from `Datasource.write`. Any class that has not implemented `write` will have the equality check return True
Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
With the new `write` added (from ray-project#32015 and ray-project#32440), Ray Data intends to support both the `write` and `do_write` functions for now. The check currently uses the `hasattr()` function to ensure the datasource object has a `write` method before using it. However, this is insufficient for a custom datasource that inherits from `Datasource` since `Datasource` has the `write` method implemented. If the custom datasource only has `do_write` implemented, `hasattr(datasource, "write")` will return True since `hasattr()` will detect methods via inheritance. The solution is to check if the `write` method was overwritten from `Datasource.write`. Any class that has not implemented `write` will have the equality check return True Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
With the new `write` added (from ray-project#32015 and ray-project#32440), Ray Data intends to support both the `write` and `do_write` functions for now. The check currently uses the `hasattr()` function to ensure the datasource object has a `write` method before using it. However, this is insufficient for a custom datasource that inherits from `Datasource` since `Datasource` has the `write` method implemented. If the custom datasource only has `do_write` implemented, `hasattr(datasource, "write")` will return True since `hasattr()` will detect methods via inheritance. The solution is to check if the `write` method was overwritten from `Datasource.write`. Any class that has not implemented `write` will have the equality check return True
Signed-off-by: elliottower <elliot@elliottower.com>
With the new `write` added (from ray-project#32015 and ray-project#32440), Ray Data intends to support both the `write` and `do_write` functions for now. The check currently uses the `hasattr()` function to ensure the datasource object has a `write` method before using it. However, this is insufficient for a custom datasource that inherits from `Datasource` since `Datasource` has the `write` method implemented. If the custom datasource only has `do_write` implemented, `hasattr(datasource, "write")` will return True since `hasattr()` will detect methods via inheritance. The solution is to check if the `write` method was overwritten from `Datasource.write`. Any class that has not implemented `write` will have the equality check return True Signed-off-by: elliottower <elliot@elliottower.com>
Why are these changes needed?
This is a followup to #32015
Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.