Skip to content

Commit 2cdf059

Browse files
LiaCastanedaAdamGSfriendlymatthew
authored
[branch-49] Backport apache#17129 to branch 49 (apache#17143) (#42)
* Preserve equivalence properties during projection pushdown (apache#17129) * Adds parquet data diffs --------- Co-authored-by: Adam Gutglick <adam@spiraldb.com> Co-authored-by: Matthew Kim <38759997+friendlymatthew@users.noreply.github.com>
1 parent 28c0673 commit 2cdf059

File tree

4 files changed

+66
-1
lines changed

4 files changed

+66
-1
lines changed

datafusion/datasource/src/source.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use std::fmt;
2222
use std::fmt::{Debug, Formatter};
2323
use std::sync::Arc;
2424

25+
use datafusion_physical_expr::equivalence::ProjectionMapping;
2526
use datafusion_physical_plan::execution_plan::{
2627
Boundedness, EmissionType, SchedulingType,
2728
};
@@ -324,7 +325,39 @@ impl ExecutionPlan for DataSourceExec {
324325
&self,
325326
projection: &ProjectionExec,
326327
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
327-
self.data_source.try_swapping_with_projection(projection)
328+
match self.data_source.try_swapping_with_projection(projection)? {
329+
Some(new_plan) => {
330+
if let Some(new_data_source_exec) =
331+
new_plan.as_any().downcast_ref::<DataSourceExec>()
332+
{
333+
let projection_mapping = ProjectionMapping::try_new(
334+
projection.expr().iter().cloned(),
335+
&self.schema(),
336+
)?;
337+
338+
// Project the equivalence properties to the new schema
339+
let projected_eq_properties = self
340+
.cache
341+
.eq_properties
342+
.project(&projection_mapping, new_data_source_exec.schema());
343+
344+
let preserved_exec = DataSourceExec {
345+
data_source: Arc::clone(&new_data_source_exec.data_source),
346+
cache: PlanProperties::new(
347+
projected_eq_properties,
348+
new_data_source_exec.cache.partitioning.clone(),
349+
new_data_source_exec.cache.emission_type,
350+
new_data_source_exec.cache.boundedness,
351+
)
352+
.with_scheduling_type(new_data_source_exec.cache.scheduling_type),
353+
};
354+
Ok(Some(Arc::new(preserved_exec)))
355+
} else {
356+
Ok(Some(new_plan))
357+
}
358+
}
359+
None => Ok(None),
360+
}
328361
}
329362

330363
fn handle_child_pushdown_result(
1.35 KB
Binary file not shown.
1.37 KB
Binary file not shown.

datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -528,3 +528,35 @@ query TT
528528
select val, part from t_pushdown where part = val AND part = 'a';
529529
----
530530
a a
531+
532+
statement ok
533+
COPY (
534+
SELECT
535+
'00000000000000000000000000000001' AS trace_id,
536+
'2023-10-01 00:00:00'::timestamptz AS start_timestamp,
537+
'prod' as deployment_environment
538+
)
539+
TO 'data/1.parquet';
540+
541+
statement ok
542+
COPY (
543+
SELECT
544+
'00000000000000000000000000000002' AS trace_id,
545+
'2024-10-01 00:00:00'::timestamptz AS start_timestamp,
546+
'staging' as deployment_environment
547+
)
548+
TO 'data/2.parquet';
549+
550+
statement ok
551+
CREATE EXTERNAL TABLE t1 STORED AS PARQUET LOCATION 'data/';
552+
553+
statement ok
554+
SET datafusion.execution.parquet.pushdown_filters = true;
555+
556+
query T
557+
SELECT deployment_environment
558+
FROM t1
559+
WHERE trace_id = '00000000000000000000000000000002'
560+
ORDER BY start_timestamp, trace_id;
561+
----
562+
staging

0 commit comments

Comments
 (0)