From 07804384cbdcdd2861ec8a279632da32245e28f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Berkay=20=C5=9Eahin?= <124376117+berkaysynnada@users.noreply.github.com> Date: Mon, 22 Apr 2024 22:35:48 +0300 Subject: [PATCH] Projection Expression - Input Field Inconsistencies during Projection (#10088) * agg fixes * test updates * fixing count mismatch * Update aggregate_statistics.rs * catch different names * minor --- .../aggregate_statistics.rs | 9 +- datafusion/core/src/physical_planner.rs | 31 +++++-- .../functions-aggregate/src/first_last.rs | 41 +++++++--- .../src/equivalence/projection.rs | 6 +- .../test_files/agg_func_substitute.slt | 8 +- .../sqllogictest/test_files/aggregate.slt | 12 +-- .../sqllogictest/test_files/distinct_on.slt | 4 +- .../sqllogictest/test_files/group_by.slt | 82 +++++++++---------- datafusion/sqllogictest/test_files/joins.slt | 8 +- .../sqllogictest/test_files/subquery.slt | 26 ++++++ 10 files changed, 145 insertions(+), 82 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs index df54222270ce..98f8884e4985 100644 --- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs @@ -35,9 +35,6 @@ use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; #[derive(Default)] pub struct AggregateStatistics {} -/// The name of the column corresponding to [`COUNT_STAR_EXPANSION`] -const COUNT_STAR_NAME: &str = "COUNT(*)"; - impl AggregateStatistics { #[allow(missing_docs)] pub fn new() -> Self { @@ -144,7 +141,7 @@ fn take_optimizable(node: &dyn ExecutionPlan) -> Option> fn take_optimizable_table_count( agg_expr: &dyn AggregateExpr, stats: &Statistics, -) -> Option<(ScalarValue, &'static str)> { +) -> Option<(ScalarValue, String)> { if let (&Precision::Exact(num_rows), Some(casted_expr)) = ( &stats.num_rows, agg_expr.as_any().downcast_ref::(), @@ -158,7 +155,7 @@ fn take_optimizable_table_count( if lit_expr.value() == &COUNT_STAR_EXPANSION { return Some(( ScalarValue::Int64(Some(num_rows as i64)), - COUNT_STAR_NAME, + casted_expr.name().to_owned(), )); } } @@ -427,7 +424,7 @@ pub(crate) mod tests { /// What name would this aggregate produce in a plan? fn column_name(&self) -> &'static str { match self { - Self::CountStar => COUNT_STAR_NAME, + Self::CountStar => "COUNT(*)", Self::ColumnA(_) => "COUNT(a)", } } diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 301f68c0f24b..e6785b1dec2a 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -87,6 +87,7 @@ use datafusion_expr::expr::{ WindowFunction, }; use datafusion_expr::expr_rewriter::unnormalize_cols; +use datafusion_expr::expr_vec_fmt; use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary; use datafusion_expr::{ DescribeTable, DmlStatement, Extension, Filter, RecursiveQuery, @@ -108,6 +109,7 @@ fn create_function_physical_name( fun: &str, distinct: bool, args: &[Expr], + order_by: Option<&Vec>, ) -> Result { let names: Vec = args .iter() @@ -118,7 +120,12 @@ fn create_function_physical_name( true => "DISTINCT ", false => "", }; - Ok(format!("{}({}{})", fun, distinct_str, names.join(","))) + + let phys_name = format!("{}({}{})", fun, distinct_str, names.join(",")); + + Ok(order_by + .map(|order_by| format!("{} ORDER BY [{}]", phys_name, expr_vec_fmt!(order_by))) + .unwrap_or(phys_name)) } fn physical_name(e: &Expr) -> Result { @@ -238,22 +245,30 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result { return internal_err!("Function `Expr` with name should be resolved."); } - create_function_physical_name(fun.name(), false, &fun.args) + create_function_physical_name(fun.name(), false, &fun.args, None) } - Expr::WindowFunction(WindowFunction { fun, args, .. }) => { - create_function_physical_name(&fun.to_string(), false, args) + Expr::WindowFunction(WindowFunction { + fun, + args, + order_by, + .. + }) => { + create_function_physical_name(&fun.to_string(), false, args, Some(order_by)) } Expr::AggregateFunction(AggregateFunction { func_def, distinct, args, filter, - order_by: _, + order_by, null_treatment: _, }) => match func_def { - AggregateFunctionDefinition::BuiltIn(..) => { - create_function_physical_name(func_def.name(), *distinct, args) - } + AggregateFunctionDefinition::BuiltIn(..) => create_function_physical_name( + func_def.name(), + *distinct, + args, + order_by.as_ref(), + ), AggregateFunctionDefinition::UDF(fun) => { // TODO: Add support for filter by in AggregateUDF if filter.is_some() { diff --git a/datafusion/functions-aggregate/src/first_last.rs b/datafusion/functions-aggregate/src/first_last.rs index f8b388a4f3b1..8dc4cee87a3b 100644 --- a/datafusion/functions-aggregate/src/first_last.rs +++ b/datafusion/functions-aggregate/src/first_last.rs @@ -415,11 +415,9 @@ impl FirstValuePhysicalExpr { } pub fn convert_to_last(self) -> LastValuePhysicalExpr { - let name = if self.name.starts_with("FIRST") { - format!("LAST{}", &self.name[5..]) - } else { - format!("LAST_VALUE({})", self.expr) - }; + let mut name = format!("LAST{}", &self.name[5..]); + replace_order_by_clause(&mut name); + let FirstValuePhysicalExpr { expr, input_data_type, @@ -593,11 +591,9 @@ impl LastValuePhysicalExpr { } pub fn convert_to_first(self) -> FirstValuePhysicalExpr { - let name = if self.name.starts_with("LAST") { - format!("FIRST{}", &self.name[4..]) - } else { - format!("FIRST_VALUE({})", self.expr) - }; + let mut name = format!("FIRST{}", &self.name[4..]); + replace_order_by_clause(&mut name); + let LastValuePhysicalExpr { expr, input_data_type, @@ -905,6 +901,31 @@ fn convert_to_sort_cols( .collect::>() } +fn replace_order_by_clause(order_by: &mut String) { + let suffixes = [ + (" DESC NULLS FIRST]", " ASC NULLS LAST]"), + (" ASC NULLS FIRST]", " DESC NULLS LAST]"), + (" DESC NULLS LAST]", " ASC NULLS FIRST]"), + (" ASC NULLS LAST]", " DESC NULLS FIRST]"), + ]; + + if let Some(start) = order_by.find("ORDER BY [") { + if let Some(end) = order_by[start..].find(']') { + let order_by_start = start + 9; + let order_by_end = start + end; + + let column_order = &order_by[order_by_start..=order_by_end]; + for &(suffix, replacement) in &suffixes { + if column_order.ends_with(suffix) { + let new_order = column_order.replace(suffix, replacement); + order_by.replace_range(order_by_start..=order_by_end, &new_order); + break; + } + } + } + } +} + #[cfg(test)] mod tests { use arrow::array::Int64Array; diff --git a/datafusion/physical-expr/src/equivalence/projection.rs b/datafusion/physical-expr/src/equivalence/projection.rs index 8c747ab8a244..18e350b0977e 100644 --- a/datafusion/physical-expr/src/equivalence/projection.rs +++ b/datafusion/physical-expr/src/equivalence/projection.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use arrow::datatypes::SchemaRef; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::Result; +use datafusion_common::{internal_err, Result}; use crate::expressions::Column; use crate::PhysicalExpr; @@ -67,6 +67,10 @@ impl ProjectionMapping { // Conceptually, `source_expr` and `expression` should be the same. let idx = col.index(); let matching_input_field = input_schema.field(idx); + if col.name() != matching_input_field.name() { + return internal_err!("Input field name {} does not match with the projection expression {}", + matching_input_field.name(),col.name()) + } let matching_input_column = Column::new(matching_input_field.name(), idx); Ok(Transformed::yes(Arc::new(matching_input_column))) diff --git a/datafusion/sqllogictest/test_files/agg_func_substitute.slt b/datafusion/sqllogictest/test_files/agg_func_substitute.slt index 811bfa864ffc..7beb20a52134 100644 --- a/datafusion/sqllogictest/test_files/agg_func_substitute.slt +++ b/datafusion/sqllogictest/test_files/agg_func_substitute.slt @@ -44,11 +44,11 @@ logical_plan 03)----TableScan: multiple_ordered_table projection=[a, c] physical_plan 01)ProjectionExec: expr=[a@0 as a, NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result] -02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1))], ordering_mode=Sorted +02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted 03)----SortExec: expr=[a@0 ASC NULLS LAST] 04)------CoalesceBatchesExec: target_batch_size=8192 05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 -06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1))], ordering_mode=Sorted +06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted 07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 08)--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true @@ -64,11 +64,11 @@ logical_plan 03)----TableScan: multiple_ordered_table projection=[a, c] physical_plan 01)ProjectionExec: expr=[a@0 as a, NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result] -02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1))], ordering_mode=Sorted +02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted 03)----SortExec: expr=[a@0 ASC NULLS LAST] 04)------CoalesceBatchesExec: target_batch_size=8192 05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 -06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1))], ordering_mode=Sorted +06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted 07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 08)--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 5e4fb10456e3..8b5b84e76650 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -132,9 +132,9 @@ logical_plan 01)Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(agg_order.c1) ORDER BY [agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]] 02)--TableScan: agg_order projection=[c1, c2, c3] physical_plan -01)AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(agg_order.c1)] +01)AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(agg_order.c1) ORDER BY [agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]] 02)--CoalescePartitionsExec -03)----AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(agg_order.c1)] +03)----AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(agg_order.c1) ORDER BY [agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]] 04)------SortExec: expr=[c2@1 DESC,c3@2 ASC NULLS LAST] 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_agg_multi_order.csv]]}, projection=[c1, c2, c3], has_header=true @@ -3520,9 +3520,9 @@ logical_plan 01)Aggregate: groupBy=[[]], aggr=[[FIRST_VALUE(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 DESC NULLS FIRST]]] 02)--TableScan: convert_first_last_table projection=[c1, c3] physical_plan -01)AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(convert_first_last_table.c1)] +01)AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 DESC NULLS FIRST]] 02)--CoalescePartitionsExec -03)----AggregateExec: mode=Partial, gby=[], aggr=[LAST_VALUE(convert_first_last_table.c1)] +03)----AggregateExec: mode=Partial, gby=[], aggr=[LAST_VALUE(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 ASC NULLS LAST]] 04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 05)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/convert_first_last.csv]]}, projection=[c1, c3], output_orderings=[[c1@0 ASC NULLS LAST], [c3@1 ASC NULLS LAST]], has_header=true @@ -3534,8 +3534,8 @@ logical_plan 01)Aggregate: groupBy=[[]], aggr=[[LAST_VALUE(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 ASC NULLS LAST]]] 02)--TableScan: convert_first_last_table projection=[c1, c2] physical_plan -01)AggregateExec: mode=Final, gby=[], aggr=[LAST_VALUE(convert_first_last_table.c1)] +01)AggregateExec: mode=Final, gby=[], aggr=[LAST_VALUE(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 ASC NULLS LAST]] 02)--CoalescePartitionsExec -03)----AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(convert_first_last_table.c1)] +03)----AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 DESC NULLS FIRST]] 04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 05)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/convert_first_last.csv]]}, projection=[c1, c2], output_orderings=[[c1@0 ASC NULLS LAST], [c2@1 DESC]], has_header=true diff --git a/datafusion/sqllogictest/test_files/distinct_on.slt b/datafusion/sqllogictest/test_files/distinct_on.slt index fc0df1d5f65b..972c935cee99 100644 --- a/datafusion/sqllogictest/test_files/distinct_on.slt +++ b/datafusion/sqllogictest/test_files/distinct_on.slt @@ -97,10 +97,10 @@ physical_plan 01)ProjectionExec: expr=[FIRST_VALUE(aggregate_test_100.c3) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]@1 as c3, FIRST_VALUE(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]@2 as c2] 02)--SortPreservingMergeExec: [c1@0 ASC NULLS LAST] 03)----SortExec: expr=[c1@0 ASC NULLS LAST] -04)------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)] +04)------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST], FIRST_VALUE(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]] 05)--------CoalesceBatchesExec: target_batch_size=8192 06)----------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4 -07)------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)] +07)------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST], FIRST_VALUE(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]] 08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 09)----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], has_header=true diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index 7bef73833745..e015f7b01d0c 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -2016,10 +2016,10 @@ physical_plan 01)SortPreservingMergeExec: [col0@0 ASC NULLS LAST] 02)--SortExec: expr=[col0@0 ASC NULLS LAST] 03)----ProjectionExec: expr=[col0@0 as col0, LAST_VALUE(r.col1) ORDER BY [r.col0 ASC NULLS LAST]@3 as last_col1] -04)------AggregateExec: mode=FinalPartitioned, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)] +04)------AggregateExec: mode=FinalPartitioned, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1) ORDER BY [r.col0 ASC NULLS LAST]] 05)--------CoalesceBatchesExec: target_batch_size=8192 06)----------RepartitionExec: partitioning=Hash([col0@0, col1@1, col2@2], 4), input_partitions=4 -07)------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)] +07)------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1) ORDER BY [r.col0 ASC NULLS LAST]] 08)--------------ProjectionExec: expr=[col0@2 as col0, col1@3 as col1, col2@4 as col2, col0@0 as col0, col1@1 as col1] 09)----------------CoalesceBatchesExec: target_batch_size=8192 10)------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col0@0, col0@0)] @@ -2145,7 +2145,7 @@ logical_plan 03)----TableScan: annotated_data_infinite2 projection=[a, c, d] physical_plan 01)ProjectionExec: expr=[a@1 as a, d@0 as d, SUM(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as summation1] -02)--AggregateExec: mode=Single, gby=[d@2 as d, a@0 as a], aggr=[SUM(annotated_data_infinite2.c)], ordering_mode=PartiallySorted([1]) +02)--AggregateExec: mode=Single, gby=[d@2 as d, a@0 as a], aggr=[SUM(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]], ordering_mode=PartiallySorted([1]) 03)----StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] query III @@ -2178,7 +2178,7 @@ logical_plan 03)----TableScan: annotated_data_infinite2 projection=[a, b, c] physical_plan 01)ProjectionExec: expr=[a@0 as a, b@1 as b, FIRST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as first_c] -02)--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=Sorted +02)--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[FIRST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]], ordering_mode=Sorted 03)----StreamingTableExec: partition_sizes=1, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST] query III @@ -2204,7 +2204,7 @@ logical_plan 03)----TableScan: annotated_data_infinite2 projection=[a, b, c] physical_plan 01)ProjectionExec: expr=[a@0 as a, b@1 as b, LAST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as last_c] -02)--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=Sorted +02)--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[LAST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]], ordering_mode=Sorted 03)----StreamingTableExec: partition_sizes=1, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST] query III @@ -2292,7 +2292,7 @@ logical_plan 01)Aggregate: groupBy=[[annotated_data_infinite2.a, annotated_data_infinite2.b]], aggr=[[ARRAY_AGG(annotated_data_infinite2.d) ORDER BY [annotated_data_infinite2.d ASC NULLS LAST]]] 02)--TableScan: annotated_data_infinite2 projection=[a, b, d] physical_plan -01)AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[ARRAY_AGG(annotated_data_infinite2.d)], ordering_mode=Sorted +01)AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[ARRAY_AGG(annotated_data_infinite2.d) ORDER BY [annotated_data_infinite2.d ASC NULLS LAST]], ordering_mode=Sorted 02)--PartialSortExec: expr=[a@0 ASC NULLS LAST,b@1 ASC NULLS LAST,d@2 ASC NULLS LAST], common_prefix_length=[2] 03)----StreamingTableExec: partition_sizes=1, projection=[a, b, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST] @@ -2464,7 +2464,7 @@ logical_plan 03)----TableScan: sales_global projection=[country, amount] physical_plan 01)ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts] -02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount)] +02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]] 03)----SortExec: expr=[amount@1 ASC NULLS LAST] 04)------MemoryExec: partitions=1, partition_sizes=[1] @@ -2494,7 +2494,7 @@ logical_plan 04)------TableScan: sales_global projection=[country, amount] physical_plan 01)ProjectionExec: expr=[country@0 as country, ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST]@1 as amounts, SUM(s.amount)@2 as sum1] -02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(s.amount), SUM(s.amount)] +02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST], SUM(s.amount)] 03)----SortExec: expr=[amount@1 DESC] 04)------MemoryExec: partitions=1, partition_sizes=[1] @@ -2538,7 +2538,7 @@ logical_plan 05)--------TableScan: sales_global projection=[country, amount] physical_plan 01)ProjectionExec: expr=[country@0 as country, ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST]@1 as amounts, SUM(s.amount)@2 as sum1] -02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(s.amount), SUM(s.amount)], ordering_mode=Sorted +02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST], SUM(s.amount)], ordering_mode=Sorted 03)----SortExec: expr=[country@0 ASC NULLS LAST,amount@1 DESC] 04)------MemoryExec: partitions=1, partition_sizes=[1] @@ -2574,7 +2574,7 @@ logical_plan 05)--------TableScan: sales_global projection=[zip_code, country, amount] physical_plan 01)ProjectionExec: expr=[country@0 as country, zip_code@1 as zip_code, ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST]@2 as amounts, SUM(s.amount)@3 as sum1] -02)--AggregateExec: mode=Single, gby=[country@1 as country, zip_code@0 as zip_code], aggr=[ARRAY_AGG(s.amount), SUM(s.amount)], ordering_mode=PartiallySorted([0]) +02)--AggregateExec: mode=Single, gby=[country@1 as country, zip_code@0 as zip_code], aggr=[ARRAY_AGG(s.amount) ORDER BY [s.amount DESC NULLS FIRST], SUM(s.amount)], ordering_mode=PartiallySorted([0]) 03)----SortExec: expr=[country@1 ASC NULLS LAST,amount@2 DESC] 04)------MemoryExec: partitions=1, partition_sizes=[1] @@ -2610,7 +2610,7 @@ logical_plan 05)--------TableScan: sales_global projection=[country, amount] physical_plan 01)ProjectionExec: expr=[country@0 as country, ARRAY_AGG(s.amount) ORDER BY [s.country DESC NULLS FIRST]@1 as amounts, SUM(s.amount)@2 as sum1] -02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(s.amount), SUM(s.amount)], ordering_mode=Sorted +02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(s.amount) ORDER BY [s.country DESC NULLS FIRST], SUM(s.amount)], ordering_mode=Sorted 03)----SortExec: expr=[country@0 ASC NULLS LAST] 04)------MemoryExec: partitions=1, partition_sizes=[1] @@ -2645,7 +2645,7 @@ logical_plan 05)--------TableScan: sales_global projection=[country, amount] physical_plan 01)ProjectionExec: expr=[country@0 as country, ARRAY_AGG(s.amount) ORDER BY [s.country DESC NULLS FIRST, s.amount DESC NULLS FIRST]@1 as amounts, SUM(s.amount)@2 as sum1] -02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(s.amount), SUM(s.amount)], ordering_mode=Sorted +02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(s.amount) ORDER BY [s.country DESC NULLS FIRST, s.amount DESC NULLS FIRST], SUM(s.amount)], ordering_mode=Sorted 03)----SortExec: expr=[country@0 ASC NULLS LAST,amount@1 DESC] 04)------MemoryExec: partitions=1, partition_sizes=[1] @@ -2677,7 +2677,7 @@ logical_plan 03)----TableScan: sales_global projection=[country, amount] physical_plan 01)ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2] -02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] +02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]] 03)----SortExec: expr=[amount@1 DESC] 04)------MemoryExec: partitions=1, partition_sizes=[1] @@ -2708,7 +2708,7 @@ logical_plan 03)----TableScan: sales_global projection=[country, amount] physical_plan 01)ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2] -02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)] +02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST], FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST], FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]] 03)----SortExec: expr=[amount@1 ASC NULLS LAST] 04)------MemoryExec: partitions=1, partition_sizes=[1] @@ -2740,7 +2740,7 @@ logical_plan 03)----TableScan: sales_global projection=[country, amount] physical_plan 01)ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@2 as fv2, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@3 as amounts] -02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount), ARRAY_AGG(sales_global.amount)] +02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST], FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST], ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]] 03)----SortExec: expr=[amount@1 ASC NULLS LAST] 04)------MemoryExec: partitions=1, partition_sizes=[1] @@ -2770,7 +2770,7 @@ logical_plan 03)----TableScan: sales_global projection=[country, ts, amount] physical_plan 01)ProjectionExec: expr=[country@0 as country, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as sum1, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as amounts] -02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[SUM(sales_global.amount), ARRAY_AGG(sales_global.amount)] +02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]] 03)----SortExec: expr=[amount@2 ASC NULLS LAST] 04)------MemoryExec: partitions=1, partition_sizes=[1] @@ -2805,7 +2805,7 @@ logical_plan 04)------TableScan: sales_global projection=[country, ts, amount] physical_plan 01)ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@3 as sum1] -02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount), SUM(sales_global.amount)] +02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]] 03)----MemoryExec: partitions=1, partition_sizes=[1] query TRRR rowsort @@ -2838,7 +2838,7 @@ logical_plan 03)----TableScan: sales_global projection=[country, ts, amount] physical_plan 01)ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@3 as sum1] -02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount), SUM(sales_global.amount)] +02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST], SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]] 03)----MemoryExec: partitions=1, partition_sizes=[1] query TRRR rowsort @@ -2874,7 +2874,7 @@ logical_plan physical_plan 01)SortExec: expr=[sn@2 ASC NULLS LAST] 02)--ProjectionExec: expr=[zip_code@1 as zip_code, country@2 as country, sn@0 as sn, ts@3 as ts, currency@4 as currency, LAST_VALUE(e.amount) ORDER BY [e.sn ASC NULLS LAST]@5 as last_rate] -03)----AggregateExec: mode=Single, gby=[sn@2 as sn, zip_code@0 as zip_code, country@1 as country, ts@3 as ts, currency@4 as currency], aggr=[LAST_VALUE(e.amount)] +03)----AggregateExec: mode=Single, gby=[sn@2 as sn, zip_code@0 as zip_code, country@1 as country, ts@3 as ts, currency@4 as currency], aggr=[LAST_VALUE(e.amount) ORDER BY [e.sn ASC NULLS LAST]] 04)------ProjectionExec: expr=[zip_code@2 as zip_code, country@3 as country, sn@4 as sn, ts@5 as ts, currency@6 as currency, sn@0 as sn, amount@1 as amount] 05)--------CoalesceBatchesExec: target_batch_size=8192 06)----------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(currency@2, currency@4)], filter=ts@0 >= ts@1, projection=[sn@0, amount@3, zip_code@4, country@5, sn@6, ts@7, currency@8] @@ -2919,11 +2919,11 @@ physical_plan 01)SortPreservingMergeExec: [country@0 ASC NULLS LAST] 02)--SortExec: expr=[country@0 ASC NULLS LAST] 03)----ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@2 as fv2] -04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] +04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]] 05)--------CoalesceBatchesExec: target_batch_size=8192 06)----------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8 07)------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 -08)--------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] +08)--------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]] 09)----------------MemoryExec: partitions=1, partition_sizes=[1] query TRR @@ -2955,11 +2955,11 @@ physical_plan 01)SortPreservingMergeExec: [country@0 ASC NULLS LAST] 02)--SortExec: expr=[country@0 ASC NULLS LAST] 03)----ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as fv2] -04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] +04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]] 05)--------CoalesceBatchesExec: target_batch_size=8192 06)----------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8 07)------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 -08)--------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] +08)--------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]] 09)----------------MemoryExec: partitions=1, partition_sizes=[1] query TRR @@ -2991,9 +2991,9 @@ logical_plan 03)----TableScan: sales_global projection=[ts, amount] physical_plan 01)ProjectionExec: expr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@0 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as fv2] -02)--AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] +02)--AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]] 03)----CoalescePartitionsExec -04)------AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] +04)------AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]] 05)--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 06)----------MemoryExec: partitions=1, partition_sizes=[1] @@ -3017,9 +3017,9 @@ logical_plan 03)----TableScan: sales_global projection=[ts, amount] physical_plan 01)ProjectionExec: expr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@0 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv2] -02)--AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] +02)--AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]] 03)----CoalescePartitionsExec -04)------AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] +04)------AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]] 05)--------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 06)----------MemoryExec: partitions=1, partition_sizes=[1] @@ -3041,9 +3041,9 @@ logical_plan 03)----TableScan: sales_global projection=[ts, amount] physical_plan 01)ProjectionExec: expr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@0 as array_agg1] -02)--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(sales_global.amount)] +02)--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]] 03)----CoalescePartitionsExec -04)------AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(sales_global.amount)] +04)------AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]] 05)--------SortExec: expr=[ts@0 ASC NULLS LAST] 06)----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 07)------------MemoryExec: partitions=1, partition_sizes=[1] @@ -3065,9 +3065,9 @@ logical_plan 03)----TableScan: sales_global projection=[ts, amount] physical_plan 01)ProjectionExec: expr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@0 as array_agg1] -02)--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(sales_global.amount)] +02)--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]] 03)----CoalescePartitionsExec -04)------AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(sales_global.amount)] +04)------AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]] 05)--------SortExec: expr=[ts@0 DESC] 06)----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 07)------------MemoryExec: partitions=1, partition_sizes=[1] @@ -3089,9 +3089,9 @@ logical_plan 03)----TableScan: sales_global projection=[amount] physical_plan 01)ProjectionExec: expr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@0 as array_agg1] -02)--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(sales_global.amount)] +02)--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]] 03)----CoalescePartitionsExec -04)------AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(sales_global.amount)] +04)------AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]] 05)--------SortExec: expr=[amount@0 ASC NULLS LAST] 06)----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 07)------------MemoryExec: partitions=1, partition_sizes=[1] @@ -3118,10 +3118,10 @@ physical_plan 01)SortPreservingMergeExec: [country@0 ASC NULLS LAST] 02)--SortExec: expr=[country@0 ASC NULLS LAST] 03)----ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as array_agg1] -04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount)] +04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]] 05)--------CoalesceBatchesExec: target_batch_size=4 06)----------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8 -07)------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount)] +07)------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]] 08)--------------SortExec: expr=[amount@1 ASC NULLS LAST] 09)----------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 10)------------------MemoryExec: partitions=1, partition_sizes=[1] @@ -3154,10 +3154,10 @@ physical_plan 01)SortPreservingMergeExec: [country@0 ASC NULLS LAST] 02)--SortExec: expr=[country@0 ASC NULLS LAST] 03)----ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]@3 as fv2] -04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] +04)------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST], FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]] 05)--------CoalesceBatchesExec: target_batch_size=4 06)----------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8 -07)------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] +07)------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS FIRST]] 08)--------------SortExec: expr=[amount@1 DESC] 09)----------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 10)------------------MemoryExec: partitions=1, partition_sizes=[1] @@ -3800,10 +3800,10 @@ logical_plan 03)----TableScan: multiple_ordered_table projection=[a, c, d] physical_plan 01)ProjectionExec: expr=[FIRST_VALUE(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.a ASC NULLS LAST]@1 as first_a, LAST_VALUE(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST]@2 as last_c] -02)--AggregateExec: mode=FinalPartitioned, gby=[d@0 as d], aggr=[FIRST_VALUE(multiple_ordered_table.a), LAST_VALUE(multiple_ordered_table.c)] +02)--AggregateExec: mode=FinalPartitioned, gby=[d@0 as d], aggr=[FIRST_VALUE(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.a ASC NULLS LAST], LAST_VALUE(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST]] 03)----CoalesceBatchesExec: target_batch_size=2 04)------RepartitionExec: partitioning=Hash([d@0], 8), input_partitions=8 -05)--------AggregateExec: mode=Partial, gby=[d@2 as d], aggr=[FIRST_VALUE(multiple_ordered_table.a), FIRST_VALUE(multiple_ordered_table.c)] +05)--------AggregateExec: mode=Partial, gby=[d@2 as d], aggr=[FIRST_VALUE(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.a ASC NULLS LAST], FIRST_VALUE(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]] 06)----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 07)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true @@ -3870,7 +3870,7 @@ logical_plan 12)------------------TableScan: multiple_ordered_table projection=[a, d] physical_plan 01)ProjectionExec: expr=[LAST_VALUE(l.d) ORDER BY [l.a ASC NULLS LAST]@1 as amount_usd] -02)--AggregateExec: mode=Single, gby=[row_n@2 as row_n], aggr=[LAST_VALUE(l.d)], ordering_mode=Sorted +02)--AggregateExec: mode=Single, gby=[row_n@2 as row_n], aggr=[LAST_VALUE(l.d) ORDER BY [l.a ASC NULLS LAST]], ordering_mode=Sorted 03)----CoalesceBatchesExec: target_batch_size=2 04)------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d@1, d@1)], filter=CAST(a@0 AS Int64) >= CAST(a@1 AS Int64) - 10, projection=[a@0, d@1, row_n@4] 05)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true @@ -4974,7 +4974,7 @@ logical_plan 02)--Aggregate: groupBy=[[multiple_ordered_table.a, multiple_ordered_table.b]], aggr=[[ARRAY_AGG(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST]]] 03)----TableScan: multiple_ordered_table projection=[a, b, c] physical_plan -01)AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[ARRAY_AGG(multiple_ordered_table.c)], ordering_mode=Sorted +01)AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[ARRAY_AGG(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST]], ordering_mode=Sorted 02)--CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_orderings=[[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], [c@2 ASC NULLS LAST]], has_header=true query II? diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 65e6c17b9203..5ef33307b521 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -3362,7 +3362,7 @@ logical_plan 08)----------TableScan: annotated_data projection=[a, b] physical_plan 01)ProjectionExec: expr=[a@0 as a, LAST_VALUE(r.b) ORDER BY [r.a ASC NULLS FIRST]@3 as last_col1] -02)--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[LAST_VALUE(r.b)], ordering_mode=PartiallySorted([0]) +02)--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[LAST_VALUE(r.b) ORDER BY [r.a ASC NULLS FIRST]], ordering_mode=PartiallySorted([0]) 03)----CoalesceBatchesExec: target_batch_size=2 04)------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)] 05)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_ordering=[a@0 ASC, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true @@ -3410,7 +3410,7 @@ logical_plan 12)------------------TableScan: multiple_ordered_table projection=[a, d] physical_plan 01)ProjectionExec: expr=[LAST_VALUE(l.d) ORDER BY [l.a ASC NULLS LAST]@1 as amount_usd] -02)--AggregateExec: mode=Single, gby=[row_n@2 as row_n], aggr=[LAST_VALUE(l.d)], ordering_mode=Sorted +02)--AggregateExec: mode=Single, gby=[row_n@2 as row_n], aggr=[LAST_VALUE(l.d) ORDER BY [l.a ASC NULLS LAST]], ordering_mode=Sorted 03)----CoalesceBatchesExec: target_batch_size=2 04)------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d@1, d@1)], filter=CAST(a@0 AS Int64) >= CAST(a@1 AS Int64) - 10, projection=[a@0, d@1, row_n@4] 05)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true @@ -3447,10 +3447,10 @@ physical_plan 01)SortPreservingMergeExec: [a@0 ASC] 02)--SortExec: expr=[a@0 ASC] 03)----ProjectionExec: expr=[a@0 as a, LAST_VALUE(r.b) ORDER BY [r.a ASC NULLS FIRST]@3 as last_col1] -04)------AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[LAST_VALUE(r.b)] +04)------AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[LAST_VALUE(r.b) ORDER BY [r.a ASC NULLS FIRST]] 05)--------CoalesceBatchesExec: target_batch_size=2 06)----------RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 2), input_partitions=2 -07)------------AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[LAST_VALUE(r.b)] +07)------------AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[LAST_VALUE(r.b) ORDER BY [r.a ASC NULLS FIRST]] 08)--------------CoalesceBatchesExec: target_batch_size=2 09)----------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)] 10)------------------CoalesceBatchesExec: target_batch_size=2 diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index 155a176e8578..7196418af1c7 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -728,6 +728,32 @@ logical_plan 07)------Aggregate: groupBy=[[]], aggr=[[COUNT(Int64(1))]] 08)--------TableScan: t2 projection=[] +statement ok +set datafusion.explain.logical_plan_only = false; + +query TT +explain select (select count(*) from t1) as b, (select count(1) from t2) +---- +logical_plan +01)Projection: __scalar_sq_1.COUNT(*) AS b, __scalar_sq_2.COUNT(Int64(1)) AS COUNT(Int64(1)) +02)--Left Join: +03)----SubqueryAlias: __scalar_sq_1 +04)------Aggregate: groupBy=[[]], aggr=[[COUNT(Int64(1)) AS COUNT(*)]] +05)--------TableScan: t1 projection=[] +06)----SubqueryAlias: __scalar_sq_2 +07)------Aggregate: groupBy=[[]], aggr=[[COUNT(Int64(1))]] +08)--------TableScan: t2 projection=[] +physical_plan +01)ProjectionExec: expr=[COUNT(*)@0 as b, COUNT(Int64(1))@1 as COUNT(Int64(1))] +02)--NestedLoopJoinExec: join_type=Left +03)----ProjectionExec: expr=[4 as COUNT(*)] +04)------PlaceholderRowExec +05)----ProjectionExec: expr=[4 as COUNT(Int64(1))] +06)------PlaceholderRowExec + +statement ok +set datafusion.explain.logical_plan_only = true; + query II select (select count(*) from t1) as b, (select count(1) from t2) ----