Skip to content

Commit a7ce3bc

Browse files
committed
fixes
1 parent e59aac5 commit a7ce3bc

File tree

5 files changed

+14
-15
lines changed

5 files changed

+14
-15
lines changed

datafusion/core/tests/parquet/file_statistics.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ async fn check_stats_precision_with_filter_pushdown() {
7575
let exec = FilterPushdown::new()
7676
.optimize(Arc::new(exec), state.config().options())
7777
.unwrap();
78+
println!("exec: {:?}", exec);
7879
let filter_exec = exec.as_any().downcast_ref::<FilterExec>().unwrap();
7980
// TODO: we need to get the FilterExec to push down its filters
8081
// since they no longer get applied to the DataSourceExec directly.

datafusion/datasource/src/source.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,12 +220,12 @@ impl ExecutionPlan for DataSourceExec {
220220
.push_down_filters(parent_filters_remaining)?
221221
{
222222
let new_self = Arc::new(DataSourceExec::new(pushdown_result.inner));
223-
return Ok(Some(ExecutionPlanFilterPushdownResult::new(
223+
Ok(Some(ExecutionPlanFilterPushdownResult::new(
224224
new_self,
225225
pushdown_result.support,
226-
)));
226+
)))
227227
} else {
228-
return Ok(None);
228+
Ok(None)
229229
}
230230
}
231231
}

datafusion/physical-optimizer/src/filter_pushdown.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ impl ChildPushdownState {
7070
}
7171
}
7272

73+
/// See [`pushdown_filters`] for more details.
7374
fn push_down_into_children(
7475
node: &Arc<dyn ExecutionPlan>,
7576
filters: &[Arc<dyn PhysicalExpr>],
@@ -78,7 +79,7 @@ fn push_down_into_children(
7879
let mut new_children = Vec::with_capacity(children.len());
7980
let mut filter_pushdown_result = vec![ChildPushdownState::NoChild; filters.len()];
8081
for child in children {
81-
if let Some(result) = pushdown_filters(child, &filters)? {
82+
if let Some(result) = pushdown_filters(child, filters)? {
8283
new_children.push(result.inner);
8384
for (idx, support) in result.support.iter().enumerate() {
8485
filter_pushdown_result[idx] =
@@ -126,7 +127,7 @@ fn pushdown_filters(
126127
// These are the filters the current node "owns" or "produces" and wants to push down.
127128
let node_filters = node.filters_for_pushdown()?;
128129
// Check which nodes from parents this node is okay with us trying to push down to it's children.
129-
let parent_pushdown_request_result = node.filter_pushdown_request(&parent_filters)?;
130+
let parent_pushdown_request_result = node.filter_pushdown_request(parent_filters)?;
130131
// Do some index masking so that we only ever call nodes with the filters relevant to them / that they're allowed to touch.
131132
// But we still need to reconstruct the full result for our caller.
132133
let parent_filter_for_pushdown_indices = parent_pushdown_request_result
@@ -147,7 +148,7 @@ fn pushdown_filters(
147148
let all_filters_to_push_down = node_filters
148149
.iter()
149150
.chain(parent_filters_to_push_down.iter())
150-
.map(|f| Arc::clone(f))
151+
.map(Arc::clone)
151152
.collect::<Vec<_>>();
152153
// Push down into children
153154
let child_pushdown_result = push_down_into_children(node, &all_filters_to_push_down)?;
@@ -484,7 +485,6 @@ impl PhysicalOptimizerRule for FilterPushdown {
484485
_config: &ConfigOptions,
485486
) -> Result<Arc<dyn ExecutionPlan>> {
486487
if let Some(result) = pushdown_filters(&plan, &[])? {
487-
println!("new plan: {:?}", result.inner);
488488
Ok(result.inner)
489489
} else {
490490
Ok(plan)

datafusion/physical-plan/src/filter.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -448,12 +448,12 @@ impl ExecutionPlan for FilterExec {
448448
// We do however need to remap the columns.
449449
let input_schema = self.input.schema();
450450
let filters = filters
451-
.into_iter()
451+
.iter()
452452
.map(|f| reassign_predicate_columns(Arc::clone(f), &input_schema, false))
453453
.collect::<Result<Vec<_>>>()?;
454454
Ok(filters
455455
.into_iter()
456-
.map(|f| FilterPushdownAllowed::Allowed(f))
456+
.map(FilterPushdownAllowed::Allowed)
457457
.collect())
458458
}
459459

@@ -484,7 +484,7 @@ impl ExecutionPlan for FilterExec {
484484
}
485485
})
486486
// Combine that with any leftover filters from parents that our children couldn't handle
487-
.chain(parent_filters_remaining.iter().map(|f| Arc::clone(f)));
487+
.chain(parent_filters_remaining.iter().map(Arc::clone));
488488

489489
let new_predicate = conjunction(new_filters);
490490

datafusion/physical-plan/src/sorts/sort_filters.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -141,11 +141,9 @@ impl SortDynamicFilterSource {
141141
replace = false;
142142
break;
143143
}
144-
} else {
145-
if !new_value_is_greater_than_current {
146-
replace = false;
147-
break;
148-
}
144+
} else if !new_value_is_greater_than_current {
145+
replace = false;
146+
break;
149147
}
150148
// Handle the equality case
151149
if new_value.eq(current_value) {

0 commit comments

Comments
 (0)