diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index d05a25a67955..153d03b3ab49 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -22,6 +22,7 @@ use std::fmt; use std::fmt::{Debug, Formatter}; use std::sync::Arc; +use datafusion_physical_expr::equivalence::ProjectionMapping; use datafusion_physical_plan::execution_plan::{ Boundedness, EmissionType, SchedulingType, }; @@ -320,7 +321,39 @@ impl ExecutionPlan for DataSourceExec { &self, projection: &ProjectionExec, ) -> Result>> { - self.data_source.try_swapping_with_projection(projection) + match self.data_source.try_swapping_with_projection(projection)? { + Some(new_plan) => { + if let Some(new_data_source_exec) = + new_plan.as_any().downcast_ref::() + { + let projection_mapping = ProjectionMapping::try_new( + projection.expr().iter().cloned(), + &self.schema(), + )?; + + // Project the equivalence properties to the new schema + let projected_eq_properties = self + .cache + .eq_properties + .project(&projection_mapping, new_data_source_exec.schema()); + + let preserved_exec = DataSourceExec { + data_source: Arc::clone(&new_data_source_exec.data_source), + cache: PlanProperties::new( + projected_eq_properties, + new_data_source_exec.cache.partitioning.clone(), + new_data_source_exec.cache.emission_type, + new_data_source_exec.cache.boundedness, + ) + .with_scheduling_type(new_data_source_exec.cache.scheduling_type), + }; + Ok(Some(Arc::new(preserved_exec))) + } else { + Ok(Some(new_plan)) + } + } + None => Ok(None), + } } fn handle_child_pushdown_result( diff --git a/datafusion/sqllogictest/data/1.parquet b/datafusion/sqllogictest/data/1.parquet new file mode 100644 index 000000000000..35be09f40666 Binary files /dev/null and b/datafusion/sqllogictest/data/1.parquet differ diff --git a/datafusion/sqllogictest/data/2.parquet b/datafusion/sqllogictest/data/2.parquet new file mode 100644 index 000000000000..4ec920f1fad3 Binary files /dev/null and b/datafusion/sqllogictest/data/2.parquet differ diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index 30c0df3f644d..9c20941e211e 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -543,3 +543,35 @@ query TT select val, part from t_pushdown where part = val AND part = 'a'; ---- a a + +statement ok +COPY ( + SELECT + '00000000000000000000000000000001' AS trace_id, + '2023-10-01 00:00:00'::timestamptz AS start_timestamp, + 'prod' as deployment_environment +) +TO 'data/1.parquet'; + +statement ok +COPY ( + SELECT + '00000000000000000000000000000002' AS trace_id, + '2024-10-01 00:00:00'::timestamptz AS start_timestamp, + 'staging' as deployment_environment +) +TO 'data/2.parquet'; + +statement ok +CREATE EXTERNAL TABLE t1 STORED AS PARQUET LOCATION 'data/'; + +statement ok +SET datafusion.execution.parquet.pushdown_filters = true; + +query T +SELECT deployment_environment +FROM t1 +WHERE trace_id = '00000000000000000000000000000002' +ORDER BY start_timestamp, trace_id; +---- +staging