-
Notifications
You must be signed in to change notification settings - Fork 7.1k
[Data] Added support for projection pushdown into Parquet reads #56500
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Data] Added support for projection pushdown into Parquet reads #56500
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces an important optimization, ProjectionPushdown, which serves two purposes: fusing consecutive Project operators and pushing column projections into ParquetDatasource during reads. The implementation for fusing Project operators is well-covered with extensive tests. However, I've identified a few critical issues. The primary feature of pushing projections into Parquet reads is not tested at all. Additionally, the implementation mutates the datasource object in-place, which is a significant issue for an optimizer rule and can lead to incorrect behavior. I've left detailed comments on these points.
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
5638317 to
f80507c
Compare
Fixed typo Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
| # `read_schema` is the schema object that will be used to perform | ||
| # read operations. | ||
| # It should be None, unless user has specified the schema or columns. | ||
| # We don't use the inferred schema for read, because we infer the schema based | ||
| # on the first file. Thus, files with different schemas will end up producing | ||
| # blocks with wrong schema. | ||
| # See https://github.com/ray-project/ray/issues/47960 for more context. | ||
| read_schema = schema | ||
| inferred_schema = _infer_schema( | ||
| pq_ds, schema, columns, partitioning, _block_udf | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to defer derivation of the target schema until the actual reading starts to make sure that pushed down projections are picked up appropriately
srinathk10
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comments. LGTM otherwise.
| return True | ||
|
|
||
|
|
||
| class LogicalOperatorSupportsProjectionPushdown(LogicalOperator): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: May be move these into LogicalOperator, bc projection pushdown can be supported by all LogicalOperator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, purposefully abstracted it to avoid polluting shared interface.
We actually don't need to do pushdown explicitly in other ops (like Map) simply b/c it will be done by Operator fusion already.
| assert ds.count() == partitioned_ds.count() | ||
|
|
||
|
|
||
| def test_projection_pushdown_on_count(ray_start_regular_shared, temp_dir): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Missing project pushdown
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's done internally inside count itself
|
|
||
| name: str | ||
| expressions_list: List[Dict[str, str]] # List of {name: expression_desc} | ||
| expected_levels: int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Level terminology is a new. May be Sequence is a better termilogy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just cloning tests from RT as is
| def get_current_projection(self) -> Optional[List[str]]: | ||
| return None | ||
|
|
||
| def apply_projection(self, columns: Optional[List[str]]) -> LogicalOperator: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO for later: This should include exclude columns as well
| def get_current_projection(self) -> Optional[List[str]]: | ||
| return None | ||
|
|
||
| def apply_projection(self, columns: Optional[List[str]]) -> LogicalOperator: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please add a TODO here and tag me to use expressions here?
…project#56500) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? Adding support for projection pushdown into `Read` ops reading from parquet tables. Changes --- - Adding `ProjectionPushdown` optimization rule - Abstracting `LogicalOperatorSupportsProjectionPushdown`, etc interfaces to implement projection pushdown for various Read ops as well as `Datasource`s - Added tests for projection push-down ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com> Signed-off-by: Zhiqiang Ma <zhiqiang.ma@intel.com>
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? Adding support for projection pushdown into `Read` ops reading from parquet tables. Changes Signed-off-by: Alexey Kudinkin <ak@anyscale.com> --- - Adding `ProjectionPushdown` optimization rule - Abstracting `LogicalOperatorSupportsProjectionPushdown`, etc interfaces to implement projection pushdown for various Read ops as well as `Datasource`s - Added tests for projection push-down ## Related issue number <!-- For example: "Closes #1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…project#56500) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? Adding support for projection pushdown into `Read` ops reading from parquet tables. Changes --- - Adding `ProjectionPushdown` optimization rule - Abstracting `LogicalOperatorSupportsProjectionPushdown`, etc interfaces to implement projection pushdown for various Read ops as well as `Datasource`s - Added tests for projection push-down ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com> Signed-off-by: zac <zac@anyscale.com>
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? Adding support for projection pushdown into `Read` ops reading from parquet tables. Changes --- - Adding `ProjectionPushdown` optimization rule - Abstracting `LogicalOperatorSupportsProjectionPushdown`, etc interfaces to implement projection pushdown for various Read ops as well as `Datasource`s - Added tests for projection push-down ## Related issue number <!-- For example: "Closes #1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com> Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
…project#56500) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? Adding support for projection pushdown into `Read` ops reading from parquet tables. Changes --- - Adding `ProjectionPushdown` optimization rule - Abstracting `LogicalOperatorSupportsProjectionPushdown`, etc interfaces to implement projection pushdown for various Read ops as well as `Datasource`s - Added tests for projection push-down ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com> Signed-off-by: Marco Stephan <marco@magic.dev>
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? Adding support for projection pushdown into `Read` ops reading from parquet tables. Changes --- - Adding `ProjectionPushdown` optimization rule - Abstracting `LogicalOperatorSupportsProjectionPushdown`, etc interfaces to implement projection pushdown for various Read ops as well as `Datasource`s - Added tests for projection push-down ## Related issue number <!-- For example: "Closes #1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com> Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? Adding support for projection pushdown into `Read` ops reading from parquet tables. Changes --- - Adding `ProjectionPushdown` optimization rule - Abstracting `LogicalOperatorSupportsProjectionPushdown`, etc interfaces to implement projection pushdown for various Read ops as well as `Datasource`s - Added tests for projection push-down ## Related issue number <!-- For example: "Closes #1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com> Signed-off-by: Douglas Strodtman <douglas@anyscale.com>
…project#56500) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? Adding support for projection pushdown into `Read` ops reading from parquet tables. Changes --- - Adding `ProjectionPushdown` optimization rule - Abstracting `LogicalOperatorSupportsProjectionPushdown`, etc interfaces to implement projection pushdown for various Read ops as well as `Datasource`s - Added tests for projection push-down ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Original PR #56500 by alexeykudinkin Original: ray-project/ray#56500
…nto Parquet reads Merged from original PR #56500 Original: ray-project/ray#56500
…project#56500) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? Adding support for projection pushdown into `Read` ops reading from parquet tables. Changes --- - Adding `ProjectionPushdown` optimization rule - Abstracting `LogicalOperatorSupportsProjectionPushdown`, etc interfaces to implement projection pushdown for various Read ops as well as `Datasource`s - Added tests for projection push-down ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…project#56500) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? Adding support for projection pushdown into `Read` ops reading from parquet tables. Changes --- - Adding `ProjectionPushdown` optimization rule - Abstracting `LogicalOperatorSupportsProjectionPushdown`, etc interfaces to implement projection pushdown for various Read ops as well as `Datasource`s - Added tests for projection push-down ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com> Signed-off-by: Future-Outlier <eric901201@gmail.com>
Why are these changes needed?
Adding support for projection pushdown into
Readops reading from parquet tables.Changes
ProjectionPushdownoptimization ruleLogicalOperatorSupportsProjectionPushdown, etc interfaces to implement projection pushdown for various Read ops as well asDatasourcesRelated issue number
Checks
git commit -s) in this PR.scripts/format.shto lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.