-
Notifications
You must be signed in to change notification settings - Fork 7k
[Data] Fixing handling of renames in projection pushdown (#58033) #58037
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
## Description This change properly handles of pushing of the renaming projections into read ops (that support projections, like parquet reads). ## Related issues > Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request refactors the projection pushdown logic to correctly handle column renames, which is a great improvement. The core idea is to distinguish simple projections (selects/renames) from complex ones and push down the rename map to the data source for simple cases, avoiding an extra MapBatches operator. The changes are well-structured, introducing a collapse_transitive_map utility for chained renames and updating the ParquetDatasource and logical operators accordingly. The logic seems sound, but I have one suggestion to improve the robustness and clarity of a new helper function.
| def _combine_rename_map( | ||
| prev_column_rename_map: Optional[Dict[str, str]], | ||
| new_column_rename_map: Optional[Dict[str, str]], | ||
| ): | ||
| if not prev_column_rename_map: | ||
| combined = new_column_rename_map | ||
| elif not new_column_rename_map: | ||
| combined = prev_column_rename_map | ||
| else: | ||
| combined = prev_column_rename_map | new_column_rename_map | ||
|
|
||
| return collapse_transitive_map(combined) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is missing a return type hint. Based on its usage, it should be -> Dict[str, str].
Additionally, the combined variable can be None if one of the input rename maps is None. While collapse_transitive_map currently handles None input by returning {}, relying on this implicit behavior can be brittle. It's safer and clearer to ensure a dictionary is always passed.
I suggest adding the type hint and making the None handling explicit by using combined or {}.
| def _combine_rename_map( | |
| prev_column_rename_map: Optional[Dict[str, str]], | |
| new_column_rename_map: Optional[Dict[str, str]], | |
| ): | |
| if not prev_column_rename_map: | |
| combined = new_column_rename_map | |
| elif not new_column_rename_map: | |
| combined = prev_column_rename_map | |
| else: | |
| combined = prev_column_rename_map | new_column_rename_map | |
| return collapse_transitive_map(combined) | |
| def _combine_rename_map( | |
| prev_column_rename_map: Optional[Dict[str, str]], | |
| new_column_rename_map: Optional[Dict[str, str]], | |
| ) -> Dict[str, str]: | |
| if not prev_column_rename_map: | |
| combined = new_column_rename_map | |
| elif not new_column_rename_map: | |
| combined = prev_column_rename_map | |
| else: | |
| combined = prev_column_rename_map | new_column_rename_map | |
| return collapse_transitive_map(combined or {}) |
Description
Cherry-pick of #58033
Related issues
Additional information
Description
Related issues
Additional information