diff --git a/datafusion/core/src/catalog_common/mod.rs b/datafusion/core/src/catalog_common/mod.rs index 68c78dda4899..ff04a6920ad4 100644 --- a/datafusion/core/src/catalog_common/mod.rs +++ b/datafusion/core/src/catalog_common/mod.rs @@ -117,7 +117,7 @@ pub fn resolve_table_references( if !with.recursive { // This is a bit hackish as the CTE will be visited again as part of visiting `q`, // but thankfully `insert_relation` is idempotent. - cte.visit(self); + let _ = cte.visit(self); } self.ctes_in_scope .push(ObjectName(vec![cte.alias.name.clone()])); @@ -188,7 +188,7 @@ pub fn resolve_table_references( visitor.insert_relation(table_name); } CopyToSource::Query(query) => { - query.visit(visitor); + let _ = query.visit(visitor); } }, DFStatement::Explain(explain) => visit_statement(&explain.statement, visitor), diff --git a/datafusion/core/tests/physical_optimizer/limit_pushdown.rs b/datafusion/core/tests/physical_optimizer/limit_pushdown.rs index 1b4c28d41d19..e7e943af2786 100644 --- a/datafusion/core/tests/physical_optimizer/limit_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/limit_pushdown.rs @@ -130,12 +130,11 @@ fn transforms_coalesce_batches_exec_into_fetching_version_and_removes_local_limi LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; let expected = [ - "GlobalLimitExec: skip=0, fetch=5", - " CoalescePartitionsExec", - " CoalesceBatchesExec: target_batch_size=8192, fetch=5", - " FilterExec: c3@2 > 0", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true" + "CoalescePartitionsExec: fetch=5", + " CoalesceBatchesExec: target_batch_size=8192, fetch=5", + " FilterExec: c3@2 > 0", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true" ]; assert_eq!(get_plan_string(&after_optimize), expected); @@ -275,11 +274,10 @@ fn keeps_pushed_local_limit_exec_when_there_are_multiple_input_partitions( LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?; let expected = [ - "GlobalLimitExec: skip=0, fetch=5", - " CoalescePartitionsExec", - " FilterExec: c3@2 > 0", - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true" + "CoalescePartitionsExec: fetch=5", + " FilterExec: c3@2 > 0", + " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", + " StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true" ]; assert_eq!(get_plan_string(&after_optimize), expected); diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 39fd492786bc..1f81e0471040 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -69,11 +69,6 @@ async fn explain_analyze_baseline_metrics() { "FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434", "metrics=[output_rows=99, elapsed_compute=" ); - assert_metrics!( - &formatted, - "GlobalLimitExec: skip=0, fetch=3, ", - "metrics=[output_rows=3, elapsed_compute=" - ); assert_metrics!( &formatted, "ProjectionExec: expr=[count(*)", @@ -101,9 +96,7 @@ async fn explain_analyze_baseline_metrics() { plan.as_any().downcast_ref::().is_some() || plan.as_any().downcast_ref::().is_some() - // CoalescePartitionsExec doesn't do any work so is not included || plan.as_any().downcast_ref::().is_some() - || plan.as_any().downcast_ref::().is_some() || plan.as_any().downcast_ref::().is_some() || plan.as_any().downcast_ref::().is_some() || plan.as_any().downcast_ref::().is_some() diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 09c0368d0c0e..c8f2e9e8be5e 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -35,7 +35,7 @@ use datafusion_expr::logical_plan::{ Aggregate, Filter, LogicalPlan, Projection, Sort, Window, }; use datafusion_expr::tree_node::replace_sort_expressions; -use datafusion_expr::{col, BinaryExpr, Case, Expr, GroupingSet, Operator}; +use datafusion_expr::{col, BinaryExpr, Case, Expr, Operator}; const CSE_PREFIX: &str = "__common_expr"; diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs index ffeb922fb256..3be278b13dcd 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown.rs @@ -145,15 +145,6 @@ pub fn pushdown_limit_helper( global_state.skip = skip; global_state.fetch = fetch; - if limit_exec.input().as_any().is::() { - // If the child is a `CoalescePartitionsExec`, we should not remove the limit - // the push_down through the `CoalescePartitionsExec` to each partition will not guarantee the limit. - // TODO: we may have a better solution if we can support with_fetch for limit inside CoalescePartitionsExec. - // Follow-up issue: https://github.com/apache/datafusion/issues/14446 - global_state.satisfied = true; - return Ok((Transformed::no(pushdown_plan), global_state)); - } - // Now the global state has the most recent information, we can remove // the `LimitExec` plan. We will decide later if we should add it again // or not. diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index f9d4ec6a1a34..26baf3a3b247 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -43,6 +43,8 @@ pub struct CoalescePartitionsExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, cache: PlanProperties, + /// Optional number of rows to fetch. Stops producing rows after this fetch + pub(crate) fetch: Option, } impl CoalescePartitionsExec { @@ -53,6 +55,7 @@ impl CoalescePartitionsExec { input, metrics: ExecutionPlanMetricsSet::new(), cache, + fetch: None, } } @@ -82,9 +85,12 @@ impl DisplayAs for CoalescePartitionsExec { f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "CoalescePartitionsExec") - } + DisplayFormatType::Default | DisplayFormatType::Verbose => match self.fetch { + Some(fetch) => { + write!(f, "CoalescePartitionsExec: fetch={fetch}") + } + None => write!(f, "CoalescePartitionsExec"), + }, } } } @@ -115,9 +121,9 @@ impl ExecutionPlan for CoalescePartitionsExec { self: Arc, children: Vec>, ) -> Result> { - Ok(Arc::new(CoalescePartitionsExec::new(Arc::clone( - &children[0], - )))) + let mut plan = CoalescePartitionsExec::new(Arc::clone(&children[0])); + plan.fetch = self.fetch; + Ok(Arc::new(plan)) } fn execute( @@ -163,7 +169,11 @@ impl ExecutionPlan for CoalescePartitionsExec { } let stream = builder.build(); - Ok(Box::pin(ObservedStream::new(stream, baseline_metrics))) + Ok(Box::pin(ObservedStream::new( + stream, + baseline_metrics, + self.fetch, + ))) } } } @@ -173,7 +183,7 @@ impl ExecutionPlan for CoalescePartitionsExec { } fn statistics(&self) -> Result { - self.input.statistics() + Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1) } fn supports_limit_pushdown(&self) -> bool { @@ -183,6 +193,19 @@ impl ExecutionPlan for CoalescePartitionsExec { fn cardinality_effect(&self) -> CardinalityEffect { CardinalityEffect::Equal } + + fn fetch(&self) -> Option { + self.fetch + } + + fn with_fetch(&self, limit: Option) -> Option> { + Some(Arc::new(CoalescePartitionsExec { + input: Arc::clone(&self.input), + fetch: limit, + metrics: self.metrics.clone(), + cache: self.cache.clone(), + })) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/stream.rs b/datafusion/physical-plan/src/stream.rs index 9220646653e6..15f180a21d78 100644 --- a/datafusion/physical-plan/src/stream.rs +++ b/datafusion/physical-plan/src/stream.rs @@ -422,18 +422,44 @@ impl Stream for EmptyRecordBatchStream { pub(crate) struct ObservedStream { inner: SendableRecordBatchStream, baseline_metrics: BaselineMetrics, + fetch: Option, + produced: usize, } impl ObservedStream { pub fn new( inner: SendableRecordBatchStream, baseline_metrics: BaselineMetrics, + fetch: Option, ) -> Self { Self { inner, baseline_metrics, + fetch, + produced: 0, } } + + fn limit_reached( + &mut self, + poll: Poll>>, + ) -> Poll>> { + let Some(fetch) = self.fetch else { return poll }; + + if self.produced >= fetch { + return Poll::Ready(None); + } + + if let Poll::Ready(Some(Ok(batch))) = &poll { + if self.produced + batch.num_rows() > fetch { + let batch = batch.slice(0, fetch.saturating_sub(self.produced)); + self.produced += batch.num_rows(); + return Poll::Ready(Some(Ok(batch))); + }; + self.produced += batch.num_rows() + } + poll + } } impl RecordBatchStream for ObservedStream { @@ -449,7 +475,10 @@ impl futures::Stream for ObservedStream { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - let poll = self.inner.poll_next_unpin(cx); + let mut poll = self.inner.poll_next_unpin(cx); + if self.fetch.is_some() { + poll = self.limit_reached(poll); + } self.baseline_metrics.record_poll(poll) } } diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 433dda870def..637c2853fa05 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -229,7 +229,11 @@ impl ExecutionPlan for UnionExec { if partition < input.output_partitioning().partition_count() { let stream = input.execute(partition, context)?; debug!("Found a Union partition to execute"); - return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics))); + return Ok(Box::pin(ObservedStream::new( + stream, + baseline_metrics, + None, + ))); } else { partition -= input.output_partitioning().partition_count(); } @@ -417,7 +421,11 @@ impl ExecutionPlan for InterleaveExec { self.schema(), input_stream_vec, )); - return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics))); + return Ok(Box::pin(ObservedStream::new( + stream, + baseline_metrics, + None, + ))); } warn!("Error in InterleaveExec: Partition {} not found", partition); diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 03098c5e6ef7..f96e6df90aae 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -4971,18 +4971,17 @@ logical_plan 03)----Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[min(aggregate_test_100.c1)]] 04)------TableScan: aggregate_test_100 projection=[c1, c3] physical_plan -01)GlobalLimitExec: skip=0, fetch=5 -02)--CoalescePartitionsExec -03)----AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3, min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5] -04)------CoalesceBatchesExec: target_batch_size=8192 -05)--------RepartitionExec: partitioning=Hash([c3@0, min(aggregate_test_100.c1)@1], 4), input_partitions=4 -06)----------AggregateExec: mode=Partial, gby=[c3@0 as c3, min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5] -07)------------AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3], aggr=[min(aggregate_test_100.c1)] -08)--------------CoalesceBatchesExec: target_batch_size=8192 -09)----------------RepartitionExec: partitioning=Hash([c3@0], 4), input_partitions=4 -10)------------------AggregateExec: mode=Partial, gby=[c3@1 as c3], aggr=[min(aggregate_test_100.c1)] -11)--------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -12)----------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], has_header=true +01)CoalescePartitionsExec: fetch=5 +02)--AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3, min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5] +03)----CoalesceBatchesExec: target_batch_size=8192 +04)------RepartitionExec: partitioning=Hash([c3@0, min(aggregate_test_100.c1)@1], 4), input_partitions=4 +05)--------AggregateExec: mode=Partial, gby=[c3@0 as c3, min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5] +06)----------AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3], aggr=[min(aggregate_test_100.c1)] +07)------------CoalesceBatchesExec: target_batch_size=8192 +08)--------------RepartitionExec: partitioning=Hash([c3@0], 4), input_partitions=4 +09)----------------AggregateExec: mode=Partial, gby=[c3@1 as c3], aggr=[min(aggregate_test_100.c1)] +10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +11)--------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], has_header=true # diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index 65f35d40fcf5..a999149418ef 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -852,9 +852,8 @@ physical_plan 01)ProjectionExec: expr=[foo@0 as foo] 02)--SortExec: TopK(fetch=1000), expr=[part_key@1 ASC NULLS LAST], preserve_partitioning=[false] 03)----ProjectionExec: expr=[1 as foo, part_key@0 as part_key] -04)------GlobalLimitExec: skip=0, fetch=1 -05)--------CoalescePartitionsExec -06)----------ParquetExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-0.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-1.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet:0..794]]}, projection=[part_key], limit=1 +04)------CoalescePartitionsExec: fetch=1 +05)--------ParquetExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-0.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-1.parquet:0..794], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet:0..794]]}, projection=[part_key], limit=1 query I with selection as ( diff --git a/datafusion/sqllogictest/test_files/repartition.slt b/datafusion/sqllogictest/test_files/repartition.slt index 630674bb09ed..36a326928fad 100644 --- a/datafusion/sqllogictest/test_files/repartition.slt +++ b/datafusion/sqllogictest/test_files/repartition.slt @@ -121,12 +121,11 @@ logical_plan 02)--Filter: sink_table.c3 > Int16(0) 03)----TableScan: sink_table projection=[c1, c2, c3] physical_plan -01)GlobalLimitExec: skip=0, fetch=5 -02)--CoalescePartitionsExec -03)----CoalesceBatchesExec: target_batch_size=8192, fetch=5 -04)------FilterExec: c3@2 > 0 -05)--------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1 -06)----------StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true +01)CoalescePartitionsExec: fetch=5 +02)--CoalesceBatchesExec: target_batch_size=8192, fetch=5 +03)----FilterExec: c3@2 > 0 +04)------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1 +05)--------StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true # Start repratition on empty column test. # See https://github.com/apache/datafusion/issues/12057 diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index fb7afdda2ea8..cc2e0569c0e1 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -510,28 +510,27 @@ logical_plan 19)------------Projection: Int64(1) AS c1 20)--------------EmptyRelation physical_plan -01)GlobalLimitExec: skip=0, fetch=3 -02)--CoalescePartitionsExec -03)----UnionExec -04)------ProjectionExec: expr=[count(*)@0 as cnt] -05)--------AggregateExec: mode=Final, gby=[], aggr=[count(*)] -06)----------CoalescePartitionsExec -07)------------AggregateExec: mode=Partial, gby=[], aggr=[count(*)] -08)--------------ProjectionExec: expr=[] -09)----------------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[] -10)------------------CoalesceBatchesExec: target_batch_size=2 -11)--------------------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4 -12)----------------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[] -13)------------------------CoalesceBatchesExec: target_batch_size=2 -14)--------------------------FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434, projection=[c1@0] -15)----------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -16)------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c13], has_header=true -17)------ProjectionExec: expr=[1 as cnt] -18)--------PlaceholderRowExec -19)------ProjectionExec: expr=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as cnt] -20)--------BoundedWindowAggExec: wdw=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }], mode=[Sorted] -21)----------ProjectionExec: expr=[1 as c1] -22)------------PlaceholderRowExec +01)CoalescePartitionsExec: fetch=3 +02)--UnionExec +03)----ProjectionExec: expr=[count(*)@0 as cnt] +04)------AggregateExec: mode=Final, gby=[], aggr=[count(*)] +05)--------CoalescePartitionsExec +06)----------AggregateExec: mode=Partial, gby=[], aggr=[count(*)] +07)------------ProjectionExec: expr=[] +08)--------------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[] +09)----------------CoalesceBatchesExec: target_batch_size=2 +10)------------------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4 +11)--------------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[] +12)----------------------CoalesceBatchesExec: target_batch_size=2 +13)------------------------FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434, projection=[c1@0] +14)--------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +15)----------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c13], has_header=true +16)----ProjectionExec: expr=[1 as cnt] +17)------PlaceholderRowExec +18)----ProjectionExec: expr=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as cnt] +19)------BoundedWindowAggExec: wdw=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }], mode=[Sorted] +20)--------ProjectionExec: expr=[1 as c1] +21)----------PlaceholderRowExec ########