Skip to content

Commit

Permalink
Partial Sort Plan Implementation (#9125)
Browse files Browse the repository at this point in the history
* partial sort plan implementation

* implement limiting the stream and remove topk

* update documentation

* rename replace_with_partial_sort tests

* Fix imports

* Add new end to end test

* Address comments

* cargo fmt

* Address comments

* addressing comments contd

---------

Co-authored-by: Ahmet Enis Erdogan <aeniserdogan@gmail.com>
Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
  • Loading branch information
3 people authored Feb 9, 2024
1 parent 30d2be9 commit c9e4b7b
Show file tree
Hide file tree
Showing 4 changed files with 1,301 additions and 1 deletion.
140 changes: 139 additions & 1 deletion datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{plan_err, DataFusionError};
use datafusion_physical_expr::{PhysicalSortExpr, PhysicalSortRequirement};
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::sorts::partial_sort::PartialSortExec;
use datafusion_physical_plan::unbounded_output;

use itertools::izip;

Expand Down Expand Up @@ -185,7 +187,10 @@ impl PhysicalOptimizerRule for EnforceSorting {
let mut sort_pushdown = SortPushDown::new_default(updated_plan.plan);
assign_initial_requirements(&mut sort_pushdown);
let adjusted = sort_pushdown.transform_down(&pushdown_sorts)?;
Ok(adjusted.plan)

adjusted
.plan
.transform_up(&|plan| Ok(Transformed::Yes(replace_with_partial_sort(plan)?)))
}

fn name(&self) -> &str {
Expand All @@ -197,6 +202,42 @@ impl PhysicalOptimizerRule for EnforceSorting {
}
}

fn replace_with_partial_sort(
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
let plan_any = plan.as_any();
if let Some(sort_plan) = plan_any.downcast_ref::<SortExec>() {
let child = sort_plan.children()[0].clone();
if !unbounded_output(&child) {
return Ok(plan);
}

// here we're trying to find the common prefix for sorted columns that is required for the
// sort and already satisfied by the given ordering
let child_eq_properties = child.equivalence_properties();
let sort_req = PhysicalSortRequirement::from_sort_exprs(sort_plan.expr());

let mut common_prefix_length = 0;
while child_eq_properties
.ordering_satisfy_requirement(&sort_req[0..common_prefix_length + 1])
{
common_prefix_length += 1;
}
if common_prefix_length > 0 {
return Ok(Arc::new(
PartialSortExec::new(
sort_plan.expr().to_vec(),
sort_plan.input().clone(),
common_prefix_length,
)
.with_preserve_partitioning(sort_plan.preserve_partitioning())
.with_fetch(sort_plan.fetch()),
));
}
}
Ok(plan)
}

/// This function turns plans of the form
/// ```text
/// "SortExec: expr=\[a@0 ASC\]",
Expand Down Expand Up @@ -2205,4 +2246,101 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_replace_with_partial_sort() -> Result<()> {
let schema = create_test_schema3()?;
let input_sort_exprs = vec![sort_expr("a", &schema)];
let unbounded_input = stream_exec_ordered(&schema, input_sort_exprs);

let physical_plan = sort_exec(
vec![sort_expr("a", &schema), sort_expr("c", &schema)],
unbounded_input,
);

let expected_input = [
"SortExec: expr=[a@0 ASC,c@2 ASC]",
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]"
];
let expected_optimized = [
"PartialSortExec: expr=[a@0 ASC,c@2 ASC], common_prefix_length=[1]",
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan, true);
Ok(())
}

#[tokio::test]
async fn test_replace_with_partial_sort2() -> Result<()> {
let schema = create_test_schema3()?;
let input_sort_exprs = vec![sort_expr("a", &schema), sort_expr("c", &schema)];
let unbounded_input = stream_exec_ordered(&schema, input_sort_exprs);

let physical_plan = sort_exec(
vec![
sort_expr("a", &schema),
sort_expr("c", &schema),
sort_expr("d", &schema),
],
unbounded_input,
);

let expected_input = [
"SortExec: expr=[a@0 ASC,c@2 ASC,d@3 ASC]",
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC, c@2 ASC]"
];
// let optimized
let expected_optimized = [
"PartialSortExec: expr=[a@0 ASC,c@2 ASC,d@3 ASC], common_prefix_length=[2]",
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC, c@2 ASC]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan, true);
Ok(())
}

#[tokio::test]
async fn test_not_replaced_with_partial_sort_for_bounded_input() -> Result<()> {
let schema = create_test_schema3()?;
let input_sort_exprs = vec![sort_expr("b", &schema), sort_expr("c", &schema)];
let parquet_input = parquet_exec_sorted(&schema, input_sort_exprs);

let physical_plan = sort_exec(
vec![
sort_expr("a", &schema),
sort_expr("b", &schema),
sort_expr("c", &schema),
],
parquet_input,
);
let expected_input = [
"SortExec: expr=[a@0 ASC,b@1 ASC,c@2 ASC]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[b@1 ASC, c@2 ASC]"
];
let expected_no_change = expected_input;
assert_optimized!(expected_input, expected_no_change, physical_plan, false);
Ok(())
}

#[tokio::test]
async fn test_not_replaced_with_partial_sort_for_unbounded_input() -> Result<()> {
let schema = create_test_schema3()?;
let input_sort_exprs = vec![sort_expr("b", &schema), sort_expr("c", &schema)];
let unbounded_input = stream_exec_ordered(&schema, input_sort_exprs);

let physical_plan = sort_exec(
vec![
sort_expr("a", &schema),
sort_expr("b", &schema),
sort_expr("c", &schema),
],
unbounded_input,
);
let expected_input = [
"SortExec: expr=[a@0 ASC,b@1 ASC,c@2 ASC]",
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[b@1 ASC, c@2 ASC]"
];
let expected_no_change = expected_input;
assert_optimized!(expected_input, expected_no_change, physical_plan, true);
Ok(())
}
}
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/sorts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod builder;
mod cursor;
mod index;
mod merge;
pub mod partial_sort;
pub mod sort;
pub mod sort_preserving_merge;
mod stream;
Expand Down
Loading

0 comments on commit c9e4b7b

Please sign in to comment.