Skip to content

Conversation

@goutamvenkat-anyscale
Copy link
Contributor

Description

When hive partitioned, partition cols don't reside in the physical schema of the table, so you can't do projection and predicate pushdown of that subset of columns into the read layer. Basically we filter those out before pushing down.

Related issues

Fixes #58714

Additional information

Optional: Add implementation details, API changes, usage examples, screenshots, etc.

Signed-off-by: Goutam <goutam@anyscale.com>
@goutamvenkat-anyscale goutamvenkat-anyscale requested a review from a team as a code owner November 18, 2025 01:04
@goutamvenkat-anyscale
Copy link
Contributor Author

/gemini summary

@goutamvenkat-anyscale goutamvenkat-anyscale changed the title Fix Pushdown Optimizations with Hive Partitioning [Data] - Fix Pushdown Optimizations with Hive Partitioning Nov 18, 2025
@goutamvenkat-anyscale goutamvenkat-anyscale added data Ray Data-related issues go add ONLY when ready to merge, run all tests labels Nov 18, 2025
@goutamvenkat-anyscale
Copy link
Contributor Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request correctly addresses the issue of pushdown optimizations with Hive partitioning by distinguishing between data columns and partition columns. The logic to prevent predicate pushdown for expressions involving partition columns is sound and well-implemented across the datasource, logical operator, and optimizer rule. The new tests are comprehensive and cover many combinations of operations, ensuring the fix is robust.

I have a couple of suggestions for minor improvements: one to cache the partition column set for better performance and another to make the test helper ColumnTracker more robust. Overall, this is a great contribution.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

This pull request resolves a critical bug in Ray Data's handling of Hive-partitioned Parquet datasets, specifically concerning projection and predicate pushdown optimizations. Previously, these optimizations would incorrectly attempt to push down operations on partition columns, which do not exist in the physical Parquet files, leading to errors. The changes introduce logic to differentiate between data columns and partition columns, ensuring that only operations on data columns are pushed down to the underlying read layer (PyArrow), while predicates on partition columns are handled at a higher logical level within Ray Data. This ensures correct and efficient processing of Hive-partitioned data.

Highlights

  • Corrected Pushdown for Hive Partitioning: Fixed an issue where projection and predicate pushdown optimizations failed for Hive-partitioned Parquet datasets because partition columns are not part of the physical file schema.
  • Intelligent Predicate Handling: Predicates involving partition columns are now identified and not pushed down to the read layer, ensuring they are applied correctly after partition columns are added to the dataset.
  • Enhanced Test Coverage: Added comprehensive tests for various data operations (select, rename, filter, with_column) on Hive-partitioned Parquet datasets to validate the fix and ensure robustness.
Changelog
  • python/ray/data/_internal/datasource/parquet_datasource.py
    • Introduced _get_partition_columns_set to identify partition columns.
    • Added _get_data_columns to filter out partition columns from projection maps.
    • Modified apply_predicate to prevent pushing down predicates that reference partition columns.
  • python/ray/data/_internal/logical/operators/read_operator.py
    • Updated apply_predicate to conditionally return the original operator if the predicate cannot be pushed down by the datasource.
  • python/ray/data/_internal/logical/rules/predicate_pushdown.py
    • Adjusted _try_push_down_predicate to retain the Filter operator in the plan if the predicate pushdown to the datasource was unsuccessful.
  • python/ray/data/_internal/planner/plan_expression/expression_visitors.py
    • Added get_column_references utility function to extract column names from an expression, aiding in predicate analysis.
  • python/ray/data/tests/test_parquet.py
    • Added a new hive_partitioned_dataset fixture for creating test data.
    • Implemented test_hive_partitioned_parquet_operations with extensive parameterized tests covering various data transformations on Hive-partitioned datasets.
Activity
  • The author, goutamvenkat-anyscale, initiated the summary and review process.
  • The gemini-code-assist bot provided a medium-priority review suggestion to optimize _get_partition_columns_set by making it a functools.cached_property.
  • The gemini-code-assist bot provided a medium-priority review suggestion to improve the ColumnTracker in tests by dynamically initializing it from DataFrame columns.

cursor[bot]

This comment was marked as resolved.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request fixes an issue with predicate and projection pushdown for Hive-partitioned Parquet datasets. The core change is to prevent pushdown for operations on partition columns, as they don't exist in the physical file schema. The logic is correctly implemented by checking for partition column references in ParquetDatasource.apply_predicate and propagating the decision to not push down through the logical plan execution. The addition of get_column_references is a useful utility, and the extensive new tests for Hive partitioning are excellent and cover many complex scenarios.

I have two suggestions for improvement:

  1. In ParquetDatasource, caching the set of partition columns to avoid re-computation.
  2. In the new test file, refactoring the long apply_operation helper function to improve readability and reduce code duplication.

Overall, this is a solid contribution that addresses an important correctness issue in data processing.

Signed-off-by: Goutam <goutam@anyscale.com>
@goutamvenkat-anyscale
Copy link
Contributor Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a fix for pushdown optimizations (projection and predicate) when using Hive partitioning in Parquet datasources. The core problem is that partition columns don't exist in the physical file schema, causing pushdown attempts to fail. The solution correctly separates data columns from partition columns. Predicates on partition columns are no longer pushed down to the file reader, and projections are handled by filtering out partition columns before reading. The changes are logical and well-implemented, and they are supported by an excellent, comprehensive new test suite that validates numerous combinations of operations. I have one suggestion for a minor performance improvement by caching.

Signed-off-by: Goutam <goutam@anyscale.com>
Comment on lines 507 to 513
# No partition columns in projection map:
# - If _partition_columns is None: user only specified data columns during
# initialization, so include all partition columns (return None)
# - If _partition_columns is not None: partition columns were requested during
# initialization but are missing from _projection_map, which means projection
# pushdown excluded them, so exclude all partition columns (return [])
return None if self._partition_columns is None else []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the polymorphism of self._partition_columns (that it can be None, an empty list, or a non-empty list) leads to complex logic like this and makes the attribute harder to reason about.

Would it be simpler if we refactored the code so that self._partition_columns always refers to the selected partition columns (i.e., can't be None)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point

# pushdown excluded them, so exclude all partition columns (return [])
return None if self._partition_columns is None else []

def _get_data_columns(self) -> Optional[List[str]]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This seems like the counterpart to _get_partition_columns_from_projection, but the naming is inconsistent.

Suggested change
def _get_data_columns(self) -> Optional[List[str]]:
def _get_data_columns_from_projection(self) -> Optional[List[str]]:

Copy link
Contributor Author

@goutamvenkat-anyscale goutamvenkat-anyscale Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is from the parent mixin class and is used in other datasources.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll rename the _get_partition_columns_from_projection to _get_partition_columns for better alignment

Comment on lines 192 to 196
# If datasource is unchanged (e.g., predicate references partition columns
# that can't be pushed down), return self unchanged so Filter operator remains
if predicated_datasource is self._datasource:
return self

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic adds the has the implicit assumption that if _DatasourceProjectionPushdownMixin.apply_predicate returns itself unchanged, then it means that the filter can't be pushed down.

Is there a way we can make this more explciit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I can nuke this line

Comment on lines 551 to 554
if referenced_cols & partition_cols:
# Don't push down predicates on partition columns
return self

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dumb question -- can't we still pushdown the data columns, and just filter the partition values in the reader implementation? Like, is it necessary to disable predicate pushdown altogether?

Copy link
Contributor Author

@goutamvenkat-anyscale goutamvenkat-anyscale Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, but then we have to handle cases like: ds.filter(col('data_col') > 5 & col('partition_col') == 2) where the BinaryExpr has to be split up

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright I handled it.

],
ids=lambda ops: "_".join(ops) if isinstance(ops, tuple) else ops,
)
def test_hive_partitioned_parquet_operations(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How long does this test take to run? I think this test add ~30 parameterizations of E2E tests, and that might substantially add to the runtime of our test suite.

Would it be difficult to write some or all these as unit tests? If it is difficult, what would we need to refactor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is super quick. It takes 5 secs for all combos

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OOC is this on a laptop or on a devbox?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a laptop

… expr

Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
@goutamvenkat-anyscale
Copy link
Contributor Author

/gemini review

@goutamvenkat-anyscale
Copy link
Contributor Author

/gemini summary

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant improvement for query optimization on Hive-partitioned Parquet datasets. By splitting predicates into data-column and partition-column components, it enables both partition pruning (for partition columns) and predicate pushdown to PyArrow (for data columns). The new logic is well-structured and accompanied by extensive tests.

I've identified two main issues: a critical correctness bug in how conservative partition pruning interacts with the filter pushdown logic, and a potential bug in type checking that could lead to incorrect pruning. Please see the detailed comments for more information.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

This pull request addresses a long-standing issue where Ray Data's pushdown optimizations failed for Hive-partitioned Parquet datasets due to the distinct nature of partition columns (which are not present in physical files). The solution involves a sophisticated mechanism to analyze and split user-defined predicates, allowing data-related conditions to be pushed down to the underlying Parquet reader (PyArrow) for row-level filtering, while partition-related conditions are used for early file-level pruning. This ensures both correctness and significant performance improvements when querying partitioned data.

Highlights

  • Improved Pushdown for Hive Partitioned Data: Ray Data now correctly handles projection and predicate pushdown for Hive-partitioned Parquet datasets by distinguishing between data and partition columns, resolving previous failures where partition columns were incorrectly treated as physical schema columns.
  • Intelligent Predicate Splitting and Pruning: Introduced new logic to split complex predicates into data-only and partition-only components. Data predicates are efficiently pushed down to PyArrow for row-level filtering, while partition predicates are used for early file-level pruning, optimizing read operations.
  • Robust Logical Plan Handling: The logical plan now correctly retains Filter operators when predicates cannot be fully pushed down to the datasource (e.g., due to mixed column types or conservative evaluation), ensuring correctness and preventing unintended data inclusion.
  • Comprehensive Test Coverage: Added extensive unit and integration tests, including a new parameterized test suite for various data operations on Hive-partitioned datasets, to validate the new predicate splitting and pushdown logic across diverse scenarios.
Changelog
  • python/ray/data/_internal/datasource/parquet_datasource.py
    • Introduced _SplitPredicateResult dataclass and _split_predicate_by_columns function for predicate decomposition.
    • Enhanced ParquetDatasource.__init__ to correctly manage _projection_map and _partition_columns based on user selection and actual partitioning.
    • Added _get_partition_columns and _get_data_columns methods to accurately extract column lists for projection.
    • Overhauled apply_predicate to implement combined data pushdown and partition pruning based on predicate splitting.
  • python/ray/data/_internal/logical/operators/read_operator.py
    • Modified apply_predicate to conditionally return the original Read operator if the underlying datasource cannot fully push down the predicate, preserving the Filter operator in the logical plan.
  • python/ray/data/_internal/logical/rules/predicate_pushdown.py
    • Updated _try_push_down_predicate to check if the Read operator was unchanged after predicate application, and if so, to retain the Filter operator in the plan.
  • python/ray/data/_internal/planner/plan_expression/expression_visitors.py
    • Added get_column_references utility to extract column names from expression trees.
  • python/ray/data/datasource/partitioning.py
    • Implemented evaluate_predicate_on_partition in PathPartitionParser to evaluate predicates against partition values from file paths for pruning.
  • python/ray/data/tests/test_parquet.py
    • Added hive_partitioned_dataset fixture and test_hive_partitioned_parquet_operations with parameterized tests for various data transformations on Hive-partitioned datasets.
  • python/ray/data/tests/test_partitioning.py
    • Added new tests for evaluate_predicate_on_partition to verify partition predicate evaluation.
  • python/ray/data/tests/unit/test_parquet_predicate_split.py
    • New file containing unit tests for the _split_predicate_by_columns function, covering various predicate structures.
Activity
  • The author initiated the summary and review process.
  • Reviewers provided feedback on caching _get_partition_columns_set for performance, leading to optimizations.
  • Suggestions were made to refactor test helpers for better readability and robustness, which were implemented.
  • Several bugs were identified and addressed, including issues with projection pushdown for partition columns, incorrect handling of empty datasets, and overly broad type checks in predicate evaluation.
  • Discussions led to the implementation of predicate splitting to allow partial pushdown of data columns while handling partition columns separately, significantly improving optimization capabilities.

Signed-off-by: Goutam <goutam@anyscale.com>
@goutamvenkat-anyscale
Copy link
Contributor Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant optimization for reading Hive-partitioned Parquet datasets by enabling predicate and projection pushdown for partition columns. The core logic involves splitting filter predicates into data and partition components, allowing for simultaneous partition pruning and row-level filtering. The implementation is well-structured and includes comprehensive tests. I've identified one critical potential correctness issue regarding filter removal and have a couple of medium-severity suggestions to improve logging and code style.

Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
@raulchen raulchen merged commit 863d5c9 into ray-project:master Nov 20, 2025
6 checks passed
@goutamvenkat-anyscale goutamvenkat-anyscale deleted the goutam/hive_pushdown branch November 21, 2025 03:52
400Ping pushed a commit to 400Ping/ray that referenced this pull request Nov 21, 2025
…ct#58723)

## Description
When hive partitioned, partition cols don't reside in the physical
schema of the table, so you can't do projection and predicate pushdown
of that subset of columns into the read layer. Basically we filter those
out before pushing down.

## Related issues
Fixes ray-project#58714 

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Goutam <goutam@anyscale.com>
ykdojo pushed a commit to ykdojo/ray that referenced this pull request Nov 27, 2025
…ct#58723)

## Description
When hive partitioned, partition cols don't reside in the physical
schema of the table, so you can't do projection and predicate pushdown
of that subset of columns into the read layer. Basically we filter those
out before pushing down.

## Related issues
Fixes ray-project#58714

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: YK <1811651+ykdojo@users.noreply.github.com>
SheldonTsen pushed a commit to SheldonTsen/ray that referenced this pull request Dec 1, 2025
…ct#58723)

## Description
When hive partitioned, partition cols don't reside in the physical
schema of the table, so you can't do projection and predicate pushdown
of that subset of columns into the read layer. Basically we filter those
out before pushing down.

## Related issues
Fixes ray-project#58714 

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Goutam <goutam@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Data] rename_column doesn't work with Hive-partitioned columns

3 participants