Skip to content

Conversation

@owenowenisme
Copy link
Member

@owenowenisme owenowenisme commented Sep 15, 2025

Why are these changes needed?

Before making zip operator a streaming operator, we make it accept multiple input first.

Now Zip operator can be used with

>>> import ray
>>> ds1 = ray.data.range(5)
>>> ds2 = ray.data.range(5)
>>> ds3 = ray.data.range(5)
>>> ds1.zip(ds2, ds3).take_batch()
{'id': array([0, 1, 2, 3, 4]), 'id_1': array([0, 1, 2, 3, 4]), 'id_2': array([0, 1, 2, 3, 4])}                                                                         
>>> ds1.zip(ds2, ds3).take_all()
[{'id': 0, 'id_1': 0, 'id_2': 0}, {'id': 1, 'id_1': 1, 'id_2': 1}, {'id': 2, 'id_1': 2, 'id_2': 2}, {'id': 3, 'id_1': 3, 'id_2': 3}, {'id': 4, 'id_1': 4, 'id_2': 4}]
>>>   

Related issue number

#56504

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

owenowenisme and others added 4 commits September 13, 2025 09:27
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
@owenowenisme owenowenisme marked this pull request as ready for review September 15, 2025 08:22
@owenowenisme owenowenisme requested a review from a team as a code owner September 15, 2025 08:22
@owenowenisme owenowenisme changed the title Data/make zip operator accept multiple input [Data] Make zip operator accept multiple input Sep 15, 2025
@ray-gardener ray-gardener bot added data Ray Data-related issues community-contribution Contributed by the community labels Sep 15, 2025
@owenowenisme
Copy link
Member Author

owenowenisme commented Sep 15, 2025

@richardliaw @gvspraveen @alexeykudinkin PTAL, thanks!

@goutamvenkat-anyscale
Copy link
Contributor

Thanks for your contribution! Overall the first change looks good. Just a few minor comments.

Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
…rable dataset size

Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
@goutamvenkat-anyscale goutamvenkat-anyscale added the go add ONLY when ready to merge, run all tests label Sep 16, 2025
python/ray/data/_internal/execution/operators/zip_operator.py
DOC101: Method `ZipOperator.__init__`: Docstring contains fewer arguments than in function signature.
DOC103: Method `ZipOperator.__init__`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [data_context: DataContext, left_input_op: PhysicalOperator]. Arguments in the docstring but not in the function signature: [left_input_ops: ].
DOC103: Method `ZipOperator.__init__`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [*input_ops: PhysicalOperator, data_context: DataContext]. Arguments in the docstring but not in the function signature: [input_ops: ].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typically when there are changes to the baseline, we would want to fix it. Is this possible to fix or is this a bug with pydoc linting?

Copy link
Member Author

@owenowenisme owenowenisme Sep 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I am aware of this, but I thought this is intended? Other operator does not have data_context in its doc
e.g. UnionOperator

    def __init__(
        self,
        data_context: DataContext,
        *input_ops: PhysicalOperator,
    ):
        """Create a UnionOperator.

        Args:
            input_ops: Operators generating input data for this operator to union.
        """

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah in the future we can just add data_context into the doc, i think that's a good thing to fix

if num_outputs is None:
num_outputs = input_num_outputs
else:
num_outputs = max(num_outputs, input_num_outputs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be min, not max

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure this is covered with tests also

Copy link
Member Author

@owenowenisme owenowenisme Sep 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! On second thought, neither max nor min seems accurate, right? Since the number of input blocks for each output should be the same (to perform a zip), and we already assert:

        total_left_rows = sum(left_block_rows)
        total_right_rows = sum(right_block_rows)
        if total_left_rows != total_right_rows:
            raise ValueError(
                "Cannot zip datasets of different number of rows: "
                f"{total_left_rows}, {total_right_rows}"
            )

Maybe we don't actually need to calculate num_outputs here?
Correct me if I'm wrong, thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if we use ‎min for the number of output rows right now, this logic will need to change when user-directed dropping or padding is introduced.

Padding would require using ‎max, while dropping would use ‎min, so the calculating number of rows here is redundant.

Discussed with @gvspraveen offline.

if num_rows is None:
num_rows = input_num_rows
else:
num_rows = max(num_rows, input_num_rows)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment

num_outputs = input.estimated_num_outputs()
if num_outputs is None:
return None
total_num_outputs = max(total_num_outputs, num_outputs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same commetn

Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
@owenowenisme owenowenisme marked this pull request as draft September 25, 2025 11:59
@owenowenisme owenowenisme marked this pull request as ready for review September 25, 2025 16:07
@richardliaw richardliaw merged commit 09a9970 into ray-project:master Sep 25, 2025
8 checks passed
else:
self._right_buffer.append(refs)
self._metrics.on_input_queued(refs)
assert 0 <= input_index <= len(self._input_dependencies), input_index
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Index Assertion Error Causes Buffer Access Failure

The assertion for input_index in _add_input_inner allows an out-of-bounds index equal to len(self._input_dependencies). This can lead to an IndexError when accessing self._input_buffers.

Fix in Cursor Fix in Web

elliot-barn pushed a commit that referenced this pull request Sep 27, 2025
## Why are these changes needed?
Before making zip operator a streaming operator, we make it accept
multiple input first.

Now Zip operator can be used with
```py
>>> import ray
>>> ds1 = ray.data.range(5)
>>> ds2 = ray.data.range(5)
>>> ds3 = ray.data.range(5)
>>> ds1.zip(ds2, ds3).take_batch()
{'id': array([0, 1, 2, 3, 4]), 'id_1': array([0, 1, 2, 3, 4]), 'id_2': array([0, 1, 2, 3, 4])}                                                                         
>>> ds1.zip(ds2, ds3).take_all()
[{'id': 0, 'id_1': 0, 'id_2': 0}, {'id': 1, 'id_1': 1, 'id_2': 1}, {'id': 2, 'id_1': 2, 'id_2': 2}, {'id': 3, 'id_1': 3, 'id_2': 3}, {'id': 4, 'id_1': 4, 'id_2': 4}]
>>>   
```
<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number
#56504 
<!-- For example: "Closes #1234" -->

Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
dstrodtman pushed a commit that referenced this pull request Oct 6, 2025
## Why are these changes needed?
Before making zip operator a streaming operator, we make it accept
multiple input first.

Now Zip operator can be used with
```py
>>> import ray
>>> ds1 = ray.data.range(5)
>>> ds2 = ray.data.range(5)
>>> ds3 = ray.data.range(5)
>>> ds1.zip(ds2, ds3).take_batch()
{'id': array([0, 1, 2, 3, 4]), 'id_1': array([0, 1, 2, 3, 4]), 'id_2': array([0, 1, 2, 3, 4])}
>>> ds1.zip(ds2, ds3).take_all()
[{'id': 0, 'id_1': 0, 'id_2': 0}, {'id': 1, 'id_1': 1, 'id_2': 1}, {'id': 2, 'id_1': 2, 'id_2': 2}, {'id': 3, 'id_1': 3, 'id_2': 3}, {'id': 4, 'id_1': 4, 'id_2': 4}]
>>>
```
<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number
#56504
<!-- For example: "Closes #1234" -->

Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: Douglas Strodtman <douglas@anyscale.com>
justinyeh1995 pushed a commit to justinyeh1995/ray that referenced this pull request Oct 20, 2025
## Why are these changes needed?
Before making zip operator a streaming operator, we make it accept
multiple input first.

Now Zip operator can be used with
```py
>>> import ray
>>> ds1 = ray.data.range(5)
>>> ds2 = ray.data.range(5)
>>> ds3 = ray.data.range(5)
>>> ds1.zip(ds2, ds3).take_batch()
{'id': array([0, 1, 2, 3, 4]), 'id_1': array([0, 1, 2, 3, 4]), 'id_2': array([0, 1, 2, 3, 4])}                                                                         
>>> ds1.zip(ds2, ds3).take_all()
[{'id': 0, 'id_1': 0, 'id_2': 0}, {'id': 1, 'id_1': 1, 'id_2': 1}, {'id': 2, 'id_1': 2, 'id_2': 2}, {'id': 3, 'id_1': 3, 'id_2': 3}, {'id': 4, 'id_1': 4, 'id_2': 4}]
>>>   
```
<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number
ray-project#56504 
<!-- For example: "Closes ray-project#1234" -->

Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
snorkelopstesting1-a11y pushed a commit to snorkel-marlin-repos/ray-project_ray_pr_56524_2dd3e4b0-709f-4113-bec6-7d0226f2ba2a that referenced this pull request Oct 22, 2025
Original PR #56524 by owenowenisme
Original: ray-project/ray#56524
snorkelopstesting3-bot added a commit to snorkel-marlin-repos/ray-project_ray_pr_56524_2dd3e4b0-709f-4113-bec6-7d0226f2ba2a that referenced this pull request Oct 22, 2025
landscapepainter pushed a commit to landscapepainter/ray that referenced this pull request Nov 17, 2025
## Why are these changes needed?
Before making zip operator a streaming operator, we make it accept
multiple input first.

Now Zip operator can be used with
```py
>>> import ray
>>> ds1 = ray.data.range(5)
>>> ds2 = ray.data.range(5)
>>> ds3 = ray.data.range(5)
>>> ds1.zip(ds2, ds3).take_batch()
{'id': array([0, 1, 2, 3, 4]), 'id_1': array([0, 1, 2, 3, 4]), 'id_2': array([0, 1, 2, 3, 4])}                                                                         
>>> ds1.zip(ds2, ds3).take_all()
[{'id': 0, 'id_1': 0, 'id_2': 0}, {'id': 1, 'id_1': 1, 'id_2': 1}, {'id': 2, 'id_1': 2, 'id_2': 2}, {'id': 3, 'id_1': 3, 'id_2': 3}, {'id': 4, 'id_1': 4, 'id_2': 4}]
>>>   
```
<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number
ray-project#56504 
<!-- For example: "Closes ray-project#1234" -->

Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Future-Outlier pushed a commit to Future-Outlier/ray that referenced this pull request Dec 7, 2025
## Why are these changes needed?
Before making zip operator a streaming operator, we make it accept
multiple input first.

Now Zip operator can be used with
```py
>>> import ray
>>> ds1 = ray.data.range(5)
>>> ds2 = ray.data.range(5)
>>> ds3 = ray.data.range(5)
>>> ds1.zip(ds2, ds3).take_batch()
{'id': array([0, 1, 2, 3, 4]), 'id_1': array([0, 1, 2, 3, 4]), 'id_2': array([0, 1, 2, 3, 4])}
>>> ds1.zip(ds2, ds3).take_all()
[{'id': 0, 'id_1': 0, 'id_2': 0}, {'id': 1, 'id_1': 1, 'id_2': 1}, {'id': 2, 'id_1': 2, 'id_2': 2}, {'id': 3, 'id_1': 3, 'id_2': 3}, {'id': 4, 'id_1': 4, 'id_2': 4}]
>>>
```
<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number
ray-project#56504
<!-- For example: "Closes ray-project#1234" -->

Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants