Skip to content

Commit 2cb84e6

Browse files
Dandandanavantgardnerio
authored andcommitted
Make limit pushdown work for SortPreservingMergeExec (apache#17893) (#361)
1 parent 6d250d7 commit 2cb84e6

File tree

1 file changed

+17
-0
lines changed

1 file changed

+17
-0
lines changed

datafusion/physical-optimizer/src/limit_pushdown_past_window.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use datafusion_expr::{WindowFrameBound, WindowFrameUnits};
2323
use datafusion_physical_plan::execution_plan::CardinalityEffect;
2424
use datafusion_physical_plan::limit::GlobalLimitExec;
2525
use datafusion_physical_plan::sorts::sort::SortExec;
26+
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
2627
use datafusion_physical_plan::windows::BoundedWindowAggExec;
2728
use datafusion_physical_plan::ExecutionPlan;
2829
use std::cmp;
@@ -91,6 +92,22 @@ impl PhysicalOptimizerRule for LimitPushPastWindows {
9192
return Ok(Transformed::no(node));
9293
}
9394

95+
// Apply the limit if we hit a sortpreservingmerge node
96+
if let Some(spm) = node.as_any().downcast_ref::<SortPreservingMergeExec>() {
97+
let latest = latest_limit.take();
98+
let Some(fetch) = latest else {
99+
latest_max = 0;
100+
return Ok(Transformed::no(node));
101+
};
102+
let fetch = match spm.fetch() {
103+
None => fetch + latest_max,
104+
Some(existing) => cmp::min(existing, fetch + latest_max),
105+
};
106+
let spm: Arc<dyn ExecutionPlan> = spm.with_fetch(Some(fetch)).unwrap();
107+
latest_max = 0;
108+
return Ok(Transformed::complete(spm));
109+
}
110+
94111
// Apply the limit if we hit a sort node
95112
if let Some(sort) = node.as_any().downcast_ref::<SortExec>() {
96113
let latest = latest_limit.take();

0 commit comments

Comments
 (0)