From 1b796b675ab7daa365431e680edd367c61ba6ed4 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Tue, 21 Nov 2023 09:19:40 +0300 Subject: [PATCH 1/2] Remove lost orderings from the final plan --- .../src/physical_optimizer/enforce_sorting.rs | 4 +++- datafusion/physical-plan/src/insert.rs | 19 ++++++--------- datafusion/sqllogictest/test_files/select.slt | 23 +++++++++++++++++++ 3 files changed, 33 insertions(+), 13 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 2590948d3b3e..6fec74f608ae 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -476,7 +476,9 @@ fn ensure_sorting( update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?; } } - (None, None) => {} + (None, None) => { + update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?; + } } } // For window expressions, we can remove some sorts when we can diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index 4eeb58974aba..5fe5c3dda138 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -219,24 +219,19 @@ impl ExecutionPlan for FileSinkExec { } fn required_input_ordering(&self) -> Vec>> { - // The input order is either exlicitly set (such as by a ListingTable), - // or require that the [FileSinkExec] gets the data in the order the - // input produced it (otherwise the optimizer may chose to reorder - // the input which could result in unintended / poor UX) - // - // More rationale: - // https://github.com/apache/arrow-datafusion/pull/6354#discussion_r1195284178 + // The input order is either explicitly set (such as by a ListingTable), + // or has no requirement. match &self.sort_order { Some(requirements) => vec![Some(requirements.clone())], - None => vec![self - .input - .output_ordering() - .map(PhysicalSortRequirement::from_sort_exprs)], + None => vec![None], } } + // FileSinkExec maintains ordering in the sense that: File written will have + // the ordering of the input. + // See rationale: https://github.com/apache/arrow-datafusion/pull/6354#discussion_r1195284178 fn maintains_input_order(&self) -> Vec { - vec![false] + vec![true] } fn children(&self) -> Vec> { diff --git a/datafusion/sqllogictest/test_files/select.slt b/datafusion/sqllogictest/test_files/select.slt index 98ea061c731b..bb81c5a9a138 100644 --- a/datafusion/sqllogictest/test_files/select.slt +++ b/datafusion/sqllogictest/test_files/select.slt @@ -1013,6 +1013,29 @@ SortPreservingMergeExec: [c@3 ASC NULLS LAST] --------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true +# When ordering lost during projection, we shouldn't keep the SortExec. +# in the final physical plan. +query TT +EXPLAIN SELECT c2, COUNT(*) +FROM (SELECT c2 +FROM aggregate_test_100 +ORDER BY c1, c2) +GROUP BY c2; +---- +logical_plan +Aggregate: groupBy=[[aggregate_test_100.c2]], aggr=[[COUNT(UInt8(1)) AS COUNT(*)]] +--Projection: aggregate_test_100.c2 +----Sort: aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST +------Projection: aggregate_test_100.c2, aggregate_test_100.c1 +--------TableScan: aggregate_test_100 projection=[c1, c2] +physical_plan +AggregateExec: mode=FinalPartitioned, gby=[c2@0 as c2], aggr=[COUNT(*)] +--CoalesceBatchesExec: target_batch_size=8192 +----RepartitionExec: partitioning=Hash([c2@0], 2), input_partitions=2 +------AggregateExec: mode=Partial, gby=[c2@0 as c2], aggr=[COUNT(*)] +--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2], has_header=true + statement ok drop table annotated_data_finite2; From d98884f5b74c0e156f47b4a9c1694bda57b0f403 Mon Sep 17 00:00:00 2001 From: Mehmet Ozan Kabak Date: Tue, 21 Nov 2023 14:57:03 +0300 Subject: [PATCH 2/2] Improve comments --- datafusion/physical-plan/src/insert.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index 5fe5c3dda138..81cdfd753fe6 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -219,18 +219,16 @@ impl ExecutionPlan for FileSinkExec { } fn required_input_ordering(&self) -> Vec>> { - // The input order is either explicitly set (such as by a ListingTable), - // or has no requirement. - match &self.sort_order { - Some(requirements) => vec![Some(requirements.clone())], - None => vec![None], - } + // The required input ordering is set externally (e.g. by a `ListingTable`). + // Otherwise, there is no specific requirement (i.e. `sort_expr` is `None`). + vec![self.sort_order.as_ref().cloned()] } - // FileSinkExec maintains ordering in the sense that: File written will have - // the ordering of the input. - // See rationale: https://github.com/apache/arrow-datafusion/pull/6354#discussion_r1195284178 fn maintains_input_order(&self) -> Vec { + // Maintains ordering in the sense that the written file will reflect + // the ordering of the input. For more context, see: + // + // https://github.com/apache/arrow-datafusion/pull/6354#discussion_r1195284178 vec![true] }