Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datafusion/core/src/catalog_common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ pub fn resolve_table_references(
if !with.recursive {
// This is a bit hackish as the CTE will be visited again as part of visiting `q`,
// but thankfully `insert_relation` is idempotent.
cte.visit(self);
let _ = cte.visit(self);
}
self.ctes_in_scope
.push(ObjectName(vec![cte.alias.name.clone()]));
Expand Down Expand Up @@ -188,7 +188,7 @@ pub fn resolve_table_references(
visitor.insert_relation(table_name);
}
CopyToSource::Query(query) => {
query.visit(visitor);
let _ = query.visit(visitor);
}
},
DFStatement::Explain(explain) => visit_statement(&explain.statement, visitor),
Expand Down
20 changes: 9 additions & 11 deletions datafusion/core/tests/physical_optimizer/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,11 @@ fn transforms_coalesce_batches_exec_into_fetching_version_and_removes_local_limi
LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;

let expected = [
"GlobalLimitExec: skip=0, fetch=5",
" CoalescePartitionsExec",
" CoalesceBatchesExec: target_batch_size=8192, fetch=5",
" FilterExec: c3@2 > 0",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
"CoalescePartitionsExec: fetch=5",
" CoalesceBatchesExec: target_batch_size=8192, fetch=5",
" FilterExec: c3@2 > 0",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
];
assert_eq!(get_plan_string(&after_optimize), expected);

Expand Down Expand Up @@ -275,11 +274,10 @@ fn keeps_pushed_local_limit_exec_when_there_are_multiple_input_partitions(
LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;

let expected = [
"GlobalLimitExec: skip=0, fetch=5",
" CoalescePartitionsExec",
" FilterExec: c3@2 > 0",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
"CoalescePartitionsExec: fetch=5",
" FilterExec: c3@2 > 0",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
];
assert_eq!(get_plan_string(&after_optimize), expected);

Expand Down
7 changes: 0 additions & 7 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,6 @@ async fn explain_analyze_baseline_metrics() {
"FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434",
"metrics=[output_rows=99, elapsed_compute="
);
assert_metrics!(
&formatted,
"GlobalLimitExec: skip=0, fetch=3, ",
"metrics=[output_rows=3, elapsed_compute="
);
assert_metrics!(
&formatted,
"ProjectionExec: expr=[count(*)",
Expand Down Expand Up @@ -101,9 +96,7 @@ async fn explain_analyze_baseline_metrics() {

plan.as_any().downcast_ref::<sorts::sort::SortExec>().is_some()
|| plan.as_any().downcast_ref::<physical_plan::aggregates::AggregateExec>().is_some()
// CoalescePartitionsExec doesn't do any work so is not included
|| plan.as_any().downcast_ref::<physical_plan::filter::FilterExec>().is_some()
|| plan.as_any().downcast_ref::<physical_plan::limit::GlobalLimitExec>().is_some()
|| plan.as_any().downcast_ref::<physical_plan::limit::LocalLimitExec>().is_some()
|| plan.as_any().downcast_ref::<physical_plan::projection::ProjectionExec>().is_some()
|| plan.as_any().downcast_ref::<physical_plan::coalesce_batches::CoalesceBatchesExec>().is_some()
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use datafusion_expr::logical_plan::{
Aggregate, Filter, LogicalPlan, Projection, Sort, Window,
};
use datafusion_expr::tree_node::replace_sort_expressions;
use datafusion_expr::{col, BinaryExpr, Case, Expr, GroupingSet, Operator};
use datafusion_expr::{col, BinaryExpr, Case, Expr, Operator};

const CSE_PREFIX: &str = "__common_expr";

Expand Down
9 changes: 0 additions & 9 deletions datafusion/physical-optimizer/src/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,6 @@ pub fn pushdown_limit_helper(
global_state.skip = skip;
global_state.fetch = fetch;

if limit_exec.input().as_any().is::<CoalescePartitionsExec>() {
// If the child is a `CoalescePartitionsExec`, we should not remove the limit
// the push_down through the `CoalescePartitionsExec` to each partition will not guarantee the limit.
// TODO: we may have a better solution if we can support with_fetch for limit inside CoalescePartitionsExec.
// Follow-up issue: https://github.com/apache/datafusion/issues/14446
global_state.satisfied = true;
return Ok((Transformed::no(pushdown_plan), global_state));
}

// Now the global state has the most recent information, we can remove
// the `LimitExec` plan. We will decide later if we should add it again
// or not.
Expand Down
39 changes: 31 additions & 8 deletions datafusion/physical-plan/src/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub struct CoalescePartitionsExec {
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
cache: PlanProperties,
/// Optional number of rows to fetch. Stops producing rows after this fetch
pub(crate) fetch: Option<usize>,
}

impl CoalescePartitionsExec {
Expand All @@ -53,6 +55,7 @@ impl CoalescePartitionsExec {
input,
metrics: ExecutionPlanMetricsSet::new(),
cache,
fetch: None,
}
}

Expand Down Expand Up @@ -82,9 +85,12 @@ impl DisplayAs for CoalescePartitionsExec {
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "CoalescePartitionsExec")
}
DisplayFormatType::Default | DisplayFormatType::Verbose => match self.fetch {
Some(fetch) => {
write!(f, "CoalescePartitionsExec: fetch={fetch}")
}
None => write!(f, "CoalescePartitionsExec"),
},
}
}
}
Expand Down Expand Up @@ -115,9 +121,9 @@ impl ExecutionPlan for CoalescePartitionsExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(CoalescePartitionsExec::new(Arc::clone(
&children[0],
))))
let mut plan = CoalescePartitionsExec::new(Arc::clone(&children[0]));
plan.fetch = self.fetch;
Ok(Arc::new(plan))
}

fn execute(
Expand Down Expand Up @@ -163,7 +169,11 @@ impl ExecutionPlan for CoalescePartitionsExec {
}

let stream = builder.build();
Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)))
Ok(Box::pin(ObservedStream::new(
stream,
baseline_metrics,
self.fetch,
)))
}
}
}
Expand All @@ -173,7 +183,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
}

fn statistics(&self) -> Result<Statistics> {
self.input.statistics()
Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1)
}

fn supports_limit_pushdown(&self) -> bool {
Expand All @@ -183,6 +193,19 @@ impl ExecutionPlan for CoalescePartitionsExec {
fn cardinality_effect(&self) -> CardinalityEffect {
CardinalityEffect::Equal
}

fn fetch(&self) -> Option<usize> {
self.fetch
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
Some(Arc::new(CoalescePartitionsExec {
input: Arc::clone(&self.input),
fetch: limit,
metrics: self.metrics.clone(),
cache: self.cache.clone(),
}))
}
}

#[cfg(test)]
Expand Down
31 changes: 30 additions & 1 deletion datafusion/physical-plan/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,18 +422,44 @@ impl Stream for EmptyRecordBatchStream {
pub(crate) struct ObservedStream {
inner: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
fetch: Option<usize>,
produced: usize,
}

impl ObservedStream {
pub fn new(
inner: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
fetch: Option<usize>,
) -> Self {
Self {
inner,
baseline_metrics,
fetch,
produced: 0,
}
}

fn limit_reached(
&mut self,
poll: Poll<Option<Result<RecordBatch>>>,
) -> Poll<Option<Result<RecordBatch>>> {
let Some(fetch) = self.fetch else { return poll };

if self.produced >= fetch {
return Poll::Ready(None);
}

if let Poll::Ready(Some(Ok(batch))) = &poll {
if self.produced + batch.num_rows() > fetch {
let batch = batch.slice(0, fetch.saturating_sub(self.produced));
self.produced += batch.num_rows();
return Poll::Ready(Some(Ok(batch)));
};
self.produced += batch.num_rows()
}
poll
}
}

impl RecordBatchStream for ObservedStream {
Expand All @@ -449,7 +475,10 @@ impl futures::Stream for ObservedStream {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let poll = self.inner.poll_next_unpin(cx);
let mut poll = self.inner.poll_next_unpin(cx);
if self.fetch.is_some() {
poll = self.limit_reached(poll);
}
self.baseline_metrics.record_poll(poll)
}
}
Expand Down
12 changes: 10 additions & 2 deletions datafusion/physical-plan/src/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,11 @@ impl ExecutionPlan for UnionExec {
if partition < input.output_partitioning().partition_count() {
let stream = input.execute(partition, context)?;
debug!("Found a Union partition to execute");
return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)));
return Ok(Box::pin(ObservedStream::new(
stream,
baseline_metrics,
None,
)));
} else {
partition -= input.output_partitioning().partition_count();
}
Expand Down Expand Up @@ -417,7 +421,11 @@ impl ExecutionPlan for InterleaveExec {
self.schema(),
input_stream_vec,
));
return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)));
return Ok(Box::pin(ObservedStream::new(
stream,
baseline_metrics,
None,
)));
}

warn!("Error in InterleaveExec: Partition {} not found", partition);
Expand Down
23 changes: 11 additions & 12 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4971,18 +4971,17 @@ logical_plan
03)----Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[min(aggregate_test_100.c1)]]
04)------TableScan: aggregate_test_100 projection=[c1, c3]
physical_plan
01)GlobalLimitExec: skip=0, fetch=5
02)--CoalescePartitionsExec
03)----AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3, min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]
04)------CoalesceBatchesExec: target_batch_size=8192
05)--------RepartitionExec: partitioning=Hash([c3@0, min(aggregate_test_100.c1)@1], 4), input_partitions=4
06)----------AggregateExec: mode=Partial, gby=[c3@0 as c3, min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]
07)------------AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3], aggr=[min(aggregate_test_100.c1)]
08)--------------CoalesceBatchesExec: target_batch_size=8192
09)----------------RepartitionExec: partitioning=Hash([c3@0], 4), input_partitions=4
10)------------------AggregateExec: mode=Partial, gby=[c3@1 as c3], aggr=[min(aggregate_test_100.c1)]
11)--------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
12)----------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], has_header=true
01)CoalescePartitionsExec: fetch=5
02)--AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3, min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]
03)----CoalesceBatchesExec: target_batch_size=8192
04)------RepartitionExec: partitioning=Hash([c3@0, min(aggregate_test_100.c1)@1], 4), input_partitions=4
05)--------AggregateExec: mode=Partial, gby=[c3@0 as c3, min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]
06)----------AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3], aggr=[min(aggregate_test_100.c1)]
07)------------CoalesceBatchesExec: target_batch_size=8192
08)--------------RepartitionExec: partitioning=Hash([c3@0], 4), input_partitions=4
09)----------------AggregateExec: mode=Partial, gby=[c3@1 as c3], aggr=[min(aggregate_test_100.c1)]
10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
11)--------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], has_header=true


#
Expand Down
5 changes: 2 additions & 3 deletions datafusion/sqllogictest/test_files/limit.slt
Original file line number Diff line number Diff line change
Expand Up @@ -852,9 +852,8 @@ physical_plan
01)ProjectionExec: expr=[foo@0 as foo]
02)--SortExec: TopK(fetch=1000), expr=[part_key@1 ASC NULLS LAST], preserve_partitioning=[false]
03)----ProjectionExec: expr=[1 as foo, part_key@0 as part_key]
04)------GlobalLimitExec: skip=0, fetch=1
05)--------CoalescePartitionsExec
06)----------ParquetExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-0.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-1.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet:0..794]]}, projection=[part_key], limit=1
04)------CoalescePartitionsExec: fetch=1
05)--------ParquetExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-0.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-1.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet:0..794]]}, projection=[part_key], limit=1

query I
with selection as (
Expand Down
11 changes: 5 additions & 6 deletions datafusion/sqllogictest/test_files/repartition.slt
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,11 @@ logical_plan
02)--Filter: sink_table.c3 > Int16(0)
03)----TableScan: sink_table projection=[c1, c2, c3]
physical_plan
01)GlobalLimitExec: skip=0, fetch=5
02)--CoalescePartitionsExec
03)----CoalesceBatchesExec: target_batch_size=8192, fetch=5
04)------FilterExec: c3@2 > 0
05)--------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
06)----------StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true
01)CoalescePartitionsExec: fetch=5
02)--CoalesceBatchesExec: target_batch_size=8192, fetch=5
03)----FilterExec: c3@2 > 0
04)------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
05)--------StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true

# Start repratition on empty column test.
# See https://github.com/apache/datafusion/issues/12057
Expand Down
43 changes: 21 additions & 22 deletions datafusion/sqllogictest/test_files/union.slt
Original file line number Diff line number Diff line change
Expand Up @@ -510,28 +510,27 @@ logical_plan
19)------------Projection: Int64(1) AS c1
20)--------------EmptyRelation
physical_plan
01)GlobalLimitExec: skip=0, fetch=3
02)--CoalescePartitionsExec
03)----UnionExec
04)------ProjectionExec: expr=[count(*)@0 as cnt]
05)--------AggregateExec: mode=Final, gby=[], aggr=[count(*)]
06)----------CoalescePartitionsExec
07)------------AggregateExec: mode=Partial, gby=[], aggr=[count(*)]
08)--------------ProjectionExec: expr=[]
09)----------------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[]
10)------------------CoalesceBatchesExec: target_batch_size=2
11)--------------------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4
12)----------------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[]
13)------------------------CoalesceBatchesExec: target_batch_size=2
14)--------------------------FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434, projection=[c1@0]
15)----------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
16)------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c13], has_header=true
17)------ProjectionExec: expr=[1 as cnt]
18)--------PlaceholderRowExec
19)------ProjectionExec: expr=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as cnt]
20)--------BoundedWindowAggExec: wdw=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }], mode=[Sorted]
21)----------ProjectionExec: expr=[1 as c1]
22)------------PlaceholderRowExec
01)CoalescePartitionsExec: fetch=3
02)--UnionExec
03)----ProjectionExec: expr=[count(*)@0 as cnt]
04)------AggregateExec: mode=Final, gby=[], aggr=[count(*)]
05)--------CoalescePartitionsExec
06)----------AggregateExec: mode=Partial, gby=[], aggr=[count(*)]
07)------------ProjectionExec: expr=[]
08)--------------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[]
09)----------------CoalesceBatchesExec: target_batch_size=2
10)------------------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4
11)--------------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[]
12)----------------------CoalesceBatchesExec: target_batch_size=2
13)------------------------FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434, projection=[c1@0]
14)--------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
15)----------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c13], has_header=true
16)----ProjectionExec: expr=[1 as cnt]
17)------PlaceholderRowExec
18)----ProjectionExec: expr=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as cnt]
19)------BoundedWindowAggExec: wdw=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }], mode=[Sorted]
20)--------ProjectionExec: expr=[1 as c1]
21)----------PlaceholderRowExec


########
Expand Down
Loading