Skip to content

Commit f03a8fd

Browse files
authored
Fix parquet filter_pushdown: respect parquet filter pushdown config in scan (apache#16646)
* respect parquet filter pushdown config in scan * Add test
1 parent 25c2a07 commit f03a8fd

File tree

2 files changed

+132
-4
lines changed

2 files changed

+132
-4
lines changed

datafusion/datasource-parquet/src/source.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -343,9 +343,7 @@ impl ParquetSource {
343343
}
344344

345345
/// If true, the predicate will be used during the parquet scan.
346-
/// Defaults to false
347-
///
348-
/// [`Expr`]: datafusion_expr::Expr
346+
/// Defaults to false.
349347
pub fn with_pushdown_filters(mut self, pushdown_filters: bool) -> Self {
350348
self.table_parquet_options.global.pushdown_filters = pushdown_filters;
351349
self
@@ -625,7 +623,13 @@ impl FileSource for ParquetSource {
625623
let Some(file_schema) = self.file_schema.clone() else {
626624
return Ok(FilterPushdownPropagation::unsupported(filters));
627625
};
628-
// Can we push down the filters themselves into the scan or only use stats pruning?
626+
// Determine if based on configs we should push filters down.
627+
// If either the table / scan itself or the config has pushdown enabled,
628+
// we will push down the filters.
629+
// If both are disabled, we will not push down the filters.
630+
// By default they are both disabled.
631+
// Regardless of pushdown, we will update the predicate to include the filters
632+
// because even if scan pushdown is disabled we can still use the filters for stats pruning.
629633
let config_pushdown_enabled = config.execution.parquet.pushdown_filters;
630634
let table_pushdown_enabled = self.pushdown_filters();
631635
let pushdown_filters = table_pushdown_enabled || config_pushdown_enabled;
@@ -647,6 +651,7 @@ impl FileSource for ParquetSource {
647651
None => conjunction(allowed_filters.iter().cloned()),
648652
};
649653
source.predicate = Some(predicate);
654+
source = source.with_pushdown_filters(pushdown_filters);
650655
let source = Arc::new(source);
651656
// If pushdown_filters is false we tell our parents that they still have to handle the filters,
652657
// even if we updated the predicate to include the filters (they will only be used for stats pruning).

datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,31 @@ NULL
7575
NULL
7676
NULL
7777

78+
query T
79+
select a from t_pushdown where b > 2 ORDER BY a;
80+
----
81+
baz
82+
foo
83+
NULL
84+
NULL
85+
NULL
86+
87+
query TT
88+
EXPLAIN select a from t where b > 2 ORDER BY a;
89+
----
90+
logical_plan
91+
01)Sort: t.a ASC NULLS LAST
92+
02)--Projection: t.a
93+
03)----Filter: t.b > Int32(2)
94+
04)------TableScan: t projection=[a, b], partial_filters=[t.b > Int32(2)]
95+
physical_plan
96+
01)SortPreservingMergeExec: [a@0 ASC NULLS LAST]
97+
02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
98+
03)----CoalesceBatchesExec: target_batch_size=8192
99+
04)------FilterExec: b@1 > 2, projection=[a@0]
100+
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
101+
06)----------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a, b], file_type=parquet, predicate=b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[]
102+
78103
query TT
79104
EXPLAIN select a from t_pushdown where b > 2 ORDER BY a;
80105
----
@@ -88,6 +113,104 @@ physical_plan
88113
02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
89114
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[]
90115

116+
# If we set the setting to `true` it override's the table's setting
117+
statement ok
118+
set datafusion.execution.parquet.pushdown_filters = true;
119+
120+
query T
121+
select a from t where b > 2 ORDER BY a;
122+
----
123+
baz
124+
foo
125+
NULL
126+
NULL
127+
NULL
128+
129+
query T
130+
select a from t_pushdown where b > 2 ORDER BY a;
131+
----
132+
baz
133+
foo
134+
NULL
135+
NULL
136+
NULL
137+
138+
query TT
139+
EXPLAIN select a from t where b > 2 ORDER BY a;
140+
----
141+
logical_plan
142+
01)Sort: t.a ASC NULLS LAST
143+
02)--Projection: t.a
144+
03)----Filter: t.b > Int32(2)
145+
04)------TableScan: t projection=[a, b], partial_filters=[t.b > Int32(2)]
146+
physical_plan
147+
01)SortPreservingMergeExec: [a@0 ASC NULLS LAST]
148+
02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
149+
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[]
150+
151+
query TT
152+
EXPLAIN select a from t_pushdown where b > 2 ORDER BY a;
153+
----
154+
logical_plan
155+
01)Sort: t_pushdown.a ASC NULLS LAST
156+
02)--Projection: t_pushdown.a
157+
03)----Filter: t_pushdown.b > Int32(2)
158+
04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.b > Int32(2)]
159+
physical_plan
160+
01)SortPreservingMergeExec: [a@0 ASC NULLS LAST]
161+
02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
162+
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[]
163+
164+
# If we reset the default the table created without pushdown goes back to disabling it
165+
statement ok
166+
set datafusion.execution.parquet.pushdown_filters = false;
167+
168+
query T
169+
select a from t where b > 2 ORDER BY a;
170+
----
171+
baz
172+
foo
173+
NULL
174+
NULL
175+
NULL
176+
177+
query T
178+
select a from t_pushdown where b > 2 ORDER BY a;
179+
----
180+
baz
181+
foo
182+
NULL
183+
NULL
184+
NULL
185+
186+
query TT
187+
EXPLAIN select a from t where b > 2 ORDER BY a;
188+
----
189+
logical_plan
190+
01)Sort: t.a ASC NULLS LAST
191+
02)--Projection: t.a
192+
03)----Filter: t.b > Int32(2)
193+
04)------TableScan: t projection=[a, b], partial_filters=[t.b > Int32(2)]
194+
physical_plan
195+
01)SortPreservingMergeExec: [a@0 ASC NULLS LAST]
196+
02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
197+
03)----CoalesceBatchesExec: target_batch_size=8192
198+
04)------FilterExec: b@1 > 2, projection=[a@0]
199+
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
200+
06)----------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a, b], file_type=parquet, predicate=b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[]
201+
202+
query TT
203+
EXPLAIN select a from t_pushdown where b > 2 ORDER BY a;
204+
----
205+
logical_plan
206+
01)Sort: t_pushdown.a ASC NULLS LAST
207+
02)--Projection: t_pushdown.a
208+
03)----Filter: t_pushdown.b > Int32(2)
209+
04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.b > Int32(2)]
210+
physical_plan
211+
01)SortPreservingMergeExec: [a@0 ASC NULLS LAST]
212+
02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
213+
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[]
91214

92215
# When filter pushdown *is* enabled, ParquetExec can filter exactly,
93216
# not just metadata, so we expect to see no FilterExec

0 commit comments

Comments
 (0)