-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Closed
Closed
Copy link
Labels
bugSomething isn't workingSomething isn't working
Description
COPY (
SELECT
'00000000000000000000000000000001' AS trace_id,
'2023-10-01 00:00:00'::timestamptz AS start_timestamp,
'prod' as deployment_environment
)
TO 'data/1.parquet';
COPY (
SELECT
'00000000000000000000000000000002' AS trace_id,
'2024-10-01 00:00:00'::timestamptz AS start_timestamp,
'staging' as deployment_environment
)
TO 'data/2.parquet';
CREATE EXTERNAL TABLE t1 STORED AS PARQUET LOCATION 'data/';
SET datafusion.execution.parquet.pushdown_filters = true;
SELECT deployment_environment
FROM t1
WHERE trace_id = '00000000000000000000000000000002'
ORDER BY start_timestamp, trace_id;SanityCheckPlan
caused by
Error during planning: Plan: ["SortPreservingMergeExec: [start_timestamp@1 ASC NULLS LAST, trace_id@2 ASC NULLS LAST]", " SortExec: expr=[start_timestamp@1 ASC NULLS LAST], preserve_partitioning=[true]", " DataSourceExec: file_groups={2 groups: [[Users/adriangb/GitHub/datafusion/data/1.parquet], [Users/adriangb/GitHub/datafusion/data/2.parquet]]}, projection=[deployment_environment, start_timestamp, trace_id], file_type=parquet, predicate=trace_id@0 = 00000000000000000000000000000002, pruning_predicate=trace_id_null_count@2 != row_count@3 AND trace_id_min@0 <= 00000000000000000000000000000002 AND 00000000000000000000000000000002 <= trace_id_max@1, required_guarantees=[trace_id in (00000000000000000000000000000002)]"] does not satisfy order requirements: [start_timestamp@1 ASC NULLS LAST, trace_id@2 ASC NULLS LAST]. Child-0 order: [[start_timestamp@1 ASC NULLS LAST]]
SanityCheckPlan fails because the equivalence properties from DataSourceExec that indicate that trace_id is a constant is lost.
This happens because FileScanConfig::try_swapping_with_projection blindly creates a new DataSourceExec (it's quite bad IMO that FileScanConfig is even creating a DataSourceExec) which looses the DataSourceExec.cache that was created in DataSourceExec::handle_child_pushdown_result. In other words, projection pushdown is resetting the plan properties that were computed when filter pushdown was done, and they can't be re-computed because DataSourceExec doesn't retain a copy of the filter.
datafusion/datafusion/datasource/src/file_scan_config.rs
Lines 607 to 642 in 20bb7e6
| fn try_swapping_with_projection( | |
| &self, | |
| projection: &ProjectionExec, | |
| ) -> Result<Option<Arc<dyn ExecutionPlan>>> { | |
| // This process can be moved into CsvExec, but it would be an overlap of their responsibility. | |
| // Must be all column references, with no table partition columns (which can not be projected) | |
| let partitioned_columns_in_proj = projection.expr().iter().any(|(expr, _)| { | |
| expr.as_any() | |
| .downcast_ref::<Column>() | |
| .map(|expr| expr.index() >= self.file_schema.fields().len()) | |
| .unwrap_or(false) | |
| }); | |
| // If there is any non-column or alias-carrier expression, Projection should not be removed. | |
| let no_aliases = all_alias_free_columns(projection.expr()); | |
| Ok((no_aliases && !partitioned_columns_in_proj).then(|| { | |
| let file_scan = self.clone(); | |
| let source = Arc::clone(&file_scan.file_source); | |
| let new_projections = new_projections_for_columns( | |
| projection, | |
| &file_scan | |
| .projection | |
| .clone() | |
| .unwrap_or_else(|| (0..self.file_schema.fields().len()).collect()), | |
| ); | |
| DataSourceExec::from_data_source( | |
| FileScanConfigBuilder::from(file_scan) | |
| // Assign projected statistics to source | |
| .with_projection(Some(new_projections)) | |
| .with_source(source) | |
| .build(), | |
| ) as _ | |
| })) | |
| } |
| new_node = new_node.add_filter_equivalence_info(filter)?; |
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working