Skip to content

Commit 87ed84c

Browse files
mertak-synnadajoroKr21
authored andcommitted
Feat: Add fetch to CoalescePartitionsExec (apache#14499)
* add fetch info to CoalescePartitionsExec * use Statistics with_fetch API on CoalescePartitionsExec * check limit_reached only if fetch is assigned
1 parent eb1dfbf commit 87ed84c

File tree

12 files changed

+122
-84
lines changed

12 files changed

+122
-84
lines changed

datafusion/core/src/catalog_common/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ pub fn resolve_table_references(
117117
if !with.recursive {
118118
// This is a bit hackish as the CTE will be visited again as part of visiting `q`,
119119
// but thankfully `insert_relation` is idempotent.
120-
cte.visit(self);
120+
let _ = cte.visit(self);
121121
}
122122
self.ctes_in_scope
123123
.push(ObjectName(vec![cte.alias.name.clone()]));
@@ -188,7 +188,7 @@ pub fn resolve_table_references(
188188
visitor.insert_relation(table_name);
189189
}
190190
CopyToSource::Query(query) => {
191-
query.visit(visitor);
191+
let _ = query.visit(visitor);
192192
}
193193
},
194194
DFStatement::Explain(explain) => visit_statement(&explain.statement, visitor),

datafusion/core/tests/physical_optimizer/limit_pushdown.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -130,12 +130,11 @@ fn transforms_coalesce_batches_exec_into_fetching_version_and_removes_local_limi
130130
LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
131131

132132
let expected = [
133-
"GlobalLimitExec: skip=0, fetch=5",
134-
" CoalescePartitionsExec",
135-
" CoalesceBatchesExec: target_batch_size=8192, fetch=5",
136-
" FilterExec: c3@2 > 0",
137-
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
138-
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
133+
"CoalescePartitionsExec: fetch=5",
134+
" CoalesceBatchesExec: target_batch_size=8192, fetch=5",
135+
" FilterExec: c3@2 > 0",
136+
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
137+
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
139138
];
140139
assert_eq!(get_plan_string(&after_optimize), expected);
141140

@@ -275,11 +274,10 @@ fn keeps_pushed_local_limit_exec_when_there_are_multiple_input_partitions(
275274
LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
276275

277276
let expected = [
278-
"GlobalLimitExec: skip=0, fetch=5",
279-
" CoalescePartitionsExec",
280-
" FilterExec: c3@2 > 0",
281-
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
282-
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
277+
"CoalescePartitionsExec: fetch=5",
278+
" FilterExec: c3@2 > 0",
279+
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
280+
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
283281
];
284282
assert_eq!(get_plan_string(&after_optimize), expected);
285283

datafusion/core/tests/sql/explain_analyze.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,6 @@ async fn explain_analyze_baseline_metrics() {
6969
"FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434",
7070
"metrics=[output_rows=99, elapsed_compute="
7171
);
72-
assert_metrics!(
73-
&formatted,
74-
"GlobalLimitExec: skip=0, fetch=3, ",
75-
"metrics=[output_rows=3, elapsed_compute="
76-
);
7772
assert_metrics!(
7873
&formatted,
7974
"ProjectionExec: expr=[count(*)",
@@ -101,9 +96,7 @@ async fn explain_analyze_baseline_metrics() {
10196

10297
plan.as_any().downcast_ref::<sorts::sort::SortExec>().is_some()
10398
|| plan.as_any().downcast_ref::<physical_plan::aggregates::AggregateExec>().is_some()
104-
// CoalescePartitionsExec doesn't do any work so is not included
10599
|| plan.as_any().downcast_ref::<physical_plan::filter::FilterExec>().is_some()
106-
|| plan.as_any().downcast_ref::<physical_plan::limit::GlobalLimitExec>().is_some()
107100
|| plan.as_any().downcast_ref::<physical_plan::limit::LocalLimitExec>().is_some()
108101
|| plan.as_any().downcast_ref::<physical_plan::projection::ProjectionExec>().is_some()
109102
|| plan.as_any().downcast_ref::<physical_plan::coalesce_batches::CoalesceBatchesExec>().is_some()

datafusion/optimizer/src/common_subexpr_eliminate.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use datafusion_expr::logical_plan::{
3535
Aggregate, Filter, LogicalPlan, Projection, Sort, Window,
3636
};
3737
use datafusion_expr::tree_node::replace_sort_expressions;
38-
use datafusion_expr::{col, BinaryExpr, Case, Expr, GroupingSet, Operator};
38+
use datafusion_expr::{col, BinaryExpr, Case, Expr, Operator};
3939

4040
const CSE_PREFIX: &str = "__common_expr";
4141

datafusion/physical-optimizer/src/limit_pushdown.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -145,15 +145,6 @@ pub fn pushdown_limit_helper(
145145
global_state.skip = skip;
146146
global_state.fetch = fetch;
147147

148-
if limit_exec.input().as_any().is::<CoalescePartitionsExec>() {
149-
// If the child is a `CoalescePartitionsExec`, we should not remove the limit
150-
// the push_down through the `CoalescePartitionsExec` to each partition will not guarantee the limit.
151-
// TODO: we may have a better solution if we can support with_fetch for limit inside CoalescePartitionsExec.
152-
// Follow-up issue: https://github.com/apache/datafusion/issues/14446
153-
global_state.satisfied = true;
154-
return Ok((Transformed::no(pushdown_plan), global_state));
155-
}
156-
157148
// Now the global state has the most recent information, we can remove
158149
// the `LimitExec` plan. We will decide later if we should add it again
159150
// or not.

datafusion/physical-plan/src/coalesce_partitions.rs

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ pub struct CoalescePartitionsExec {
4343
/// Execution metrics
4444
metrics: ExecutionPlanMetricsSet,
4545
cache: PlanProperties,
46+
/// Optional number of rows to fetch. Stops producing rows after this fetch
47+
pub(crate) fetch: Option<usize>,
4648
}
4749

4850
impl CoalescePartitionsExec {
@@ -53,6 +55,7 @@ impl CoalescePartitionsExec {
5355
input,
5456
metrics: ExecutionPlanMetricsSet::new(),
5557
cache,
58+
fetch: None,
5659
}
5760
}
5861

@@ -82,9 +85,12 @@ impl DisplayAs for CoalescePartitionsExec {
8285
f: &mut std::fmt::Formatter,
8386
) -> std::fmt::Result {
8487
match t {
85-
DisplayFormatType::Default | DisplayFormatType::Verbose => {
86-
write!(f, "CoalescePartitionsExec")
87-
}
88+
DisplayFormatType::Default | DisplayFormatType::Verbose => match self.fetch {
89+
Some(fetch) => {
90+
write!(f, "CoalescePartitionsExec: fetch={fetch}")
91+
}
92+
None => write!(f, "CoalescePartitionsExec"),
93+
},
8894
}
8995
}
9096
}
@@ -115,9 +121,9 @@ impl ExecutionPlan for CoalescePartitionsExec {
115121
self: Arc<Self>,
116122
children: Vec<Arc<dyn ExecutionPlan>>,
117123
) -> Result<Arc<dyn ExecutionPlan>> {
118-
Ok(Arc::new(CoalescePartitionsExec::new(Arc::clone(
119-
&children[0],
120-
))))
124+
let mut plan = CoalescePartitionsExec::new(Arc::clone(&children[0]));
125+
plan.fetch = self.fetch;
126+
Ok(Arc::new(plan))
121127
}
122128

123129
fn execute(
@@ -163,7 +169,11 @@ impl ExecutionPlan for CoalescePartitionsExec {
163169
}
164170

165171
let stream = builder.build();
166-
Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)))
172+
Ok(Box::pin(ObservedStream::new(
173+
stream,
174+
baseline_metrics,
175+
self.fetch,
176+
)))
167177
}
168178
}
169179
}
@@ -173,7 +183,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
173183
}
174184

175185
fn statistics(&self) -> Result<Statistics> {
176-
self.input.statistics()
186+
Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1)
177187
}
178188

179189
fn supports_limit_pushdown(&self) -> bool {
@@ -183,6 +193,19 @@ impl ExecutionPlan for CoalescePartitionsExec {
183193
fn cardinality_effect(&self) -> CardinalityEffect {
184194
CardinalityEffect::Equal
185195
}
196+
197+
fn fetch(&self) -> Option<usize> {
198+
self.fetch
199+
}
200+
201+
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
202+
Some(Arc::new(CoalescePartitionsExec {
203+
input: Arc::clone(&self.input),
204+
fetch: limit,
205+
metrics: self.metrics.clone(),
206+
cache: self.cache.clone(),
207+
}))
208+
}
186209
}
187210

188211
#[cfg(test)]

datafusion/physical-plan/src/stream.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,18 +422,44 @@ impl Stream for EmptyRecordBatchStream {
422422
pub(crate) struct ObservedStream {
423423
inner: SendableRecordBatchStream,
424424
baseline_metrics: BaselineMetrics,
425+
fetch: Option<usize>,
426+
produced: usize,
425427
}
426428

427429
impl ObservedStream {
428430
pub fn new(
429431
inner: SendableRecordBatchStream,
430432
baseline_metrics: BaselineMetrics,
433+
fetch: Option<usize>,
431434
) -> Self {
432435
Self {
433436
inner,
434437
baseline_metrics,
438+
fetch,
439+
produced: 0,
435440
}
436441
}
442+
443+
fn limit_reached(
444+
&mut self,
445+
poll: Poll<Option<Result<RecordBatch>>>,
446+
) -> Poll<Option<Result<RecordBatch>>> {
447+
let Some(fetch) = self.fetch else { return poll };
448+
449+
if self.produced >= fetch {
450+
return Poll::Ready(None);
451+
}
452+
453+
if let Poll::Ready(Some(Ok(batch))) = &poll {
454+
if self.produced + batch.num_rows() > fetch {
455+
let batch = batch.slice(0, fetch.saturating_sub(self.produced));
456+
self.produced += batch.num_rows();
457+
return Poll::Ready(Some(Ok(batch)));
458+
};
459+
self.produced += batch.num_rows()
460+
}
461+
poll
462+
}
437463
}
438464

439465
impl RecordBatchStream for ObservedStream {
@@ -449,7 +475,10 @@ impl futures::Stream for ObservedStream {
449475
mut self: Pin<&mut Self>,
450476
cx: &mut Context<'_>,
451477
) -> Poll<Option<Self::Item>> {
452-
let poll = self.inner.poll_next_unpin(cx);
478+
let mut poll = self.inner.poll_next_unpin(cx);
479+
if self.fetch.is_some() {
480+
poll = self.limit_reached(poll);
481+
}
453482
self.baseline_metrics.record_poll(poll)
454483
}
455484
}

datafusion/physical-plan/src/union.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,11 @@ impl ExecutionPlan for UnionExec {
229229
if partition < input.output_partitioning().partition_count() {
230230
let stream = input.execute(partition, context)?;
231231
debug!("Found a Union partition to execute");
232-
return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)));
232+
return Ok(Box::pin(ObservedStream::new(
233+
stream,
234+
baseline_metrics,
235+
None,
236+
)));
233237
} else {
234238
partition -= input.output_partitioning().partition_count();
235239
}
@@ -417,7 +421,11 @@ impl ExecutionPlan for InterleaveExec {
417421
self.schema(),
418422
input_stream_vec,
419423
));
420-
return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)));
424+
return Ok(Box::pin(ObservedStream::new(
425+
stream,
426+
baseline_metrics,
427+
None,
428+
)));
421429
}
422430

423431
warn!("Error in InterleaveExec: Partition {} not found", partition);

datafusion/sqllogictest/test_files/aggregate.slt

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4971,18 +4971,17 @@ logical_plan
49714971
03)----Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[min(aggregate_test_100.c1)]]
49724972
04)------TableScan: aggregate_test_100 projection=[c1, c3]
49734973
physical_plan
4974-
01)GlobalLimitExec: skip=0, fetch=5
4975-
02)--CoalescePartitionsExec
4976-
03)----AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3, min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]
4977-
04)------CoalesceBatchesExec: target_batch_size=8192
4978-
05)--------RepartitionExec: partitioning=Hash([c3@0, min(aggregate_test_100.c1)@1], 4), input_partitions=4
4979-
06)----------AggregateExec: mode=Partial, gby=[c3@0 as c3, min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]
4980-
07)------------AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3], aggr=[min(aggregate_test_100.c1)]
4981-
08)--------------CoalesceBatchesExec: target_batch_size=8192
4982-
09)----------------RepartitionExec: partitioning=Hash([c3@0], 4), input_partitions=4
4983-
10)------------------AggregateExec: mode=Partial, gby=[c3@1 as c3], aggr=[min(aggregate_test_100.c1)]
4984-
11)--------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
4985-
12)----------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], has_header=true
4974+
01)CoalescePartitionsExec: fetch=5
4975+
02)--AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3, min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]
4976+
03)----CoalesceBatchesExec: target_batch_size=8192
4977+
04)------RepartitionExec: partitioning=Hash([c3@0, min(aggregate_test_100.c1)@1], 4), input_partitions=4
4978+
05)--------AggregateExec: mode=Partial, gby=[c3@0 as c3, min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]
4979+
06)----------AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3], aggr=[min(aggregate_test_100.c1)]
4980+
07)------------CoalesceBatchesExec: target_batch_size=8192
4981+
08)--------------RepartitionExec: partitioning=Hash([c3@0], 4), input_partitions=4
4982+
09)----------------AggregateExec: mode=Partial, gby=[c3@1 as c3], aggr=[min(aggregate_test_100.c1)]
4983+
10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
4984+
11)--------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], has_header=true
49864985

49874986

49884987
#

datafusion/sqllogictest/test_files/limit.slt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -852,9 +852,8 @@ physical_plan
852852
01)ProjectionExec: expr=[foo@0 as foo]
853853
02)--SortExec: TopK(fetch=1000), expr=[part_key@1 ASC NULLS LAST], preserve_partitioning=[false]
854854
03)----ProjectionExec: expr=[1 as foo, part_key@0 as part_key]
855-
04)------GlobalLimitExec: skip=0, fetch=1
856-
05)--------CoalescePartitionsExec
857-
06)----------ParquetExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-0.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-1.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet:0..794]]}, projection=[part_key], limit=1
855+
04)------CoalescePartitionsExec: fetch=1
856+
05)--------ParquetExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-0.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-1.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet:0..794]]}, projection=[part_key], limit=1
858857

859858
query I
860859
with selection as (

0 commit comments

Comments
 (0)