Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update ExecutionPlan to know about sortedness and repartitioning optimizer pass respect the invariants #1776

Merged
merged 12 commits into from
Feb 9, 2022

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Feb 7, 2022

Draft until I have completed testing downstream in IOx

Which issue does this PR close?

Closes #424 ( Design how to respect output stream ordering )

Along with #1732, fixes #423 (the last part).

Rationale for this change

Repartitioning the input to an operator that relies on its input to be sorted is incorrect as the repartitioning will intermix the partitions and effectively "unsort" the input stream

We found this in IOx here https://github.com/influxdata/influxdb_iox/pull/3633#issuecomment-1030126757

Here is a picture showing the problem:

    ┌─────────────────────────────────┐
    │                                 │
    │     SortPreservingMergeExec     │
    │                                 │
    └─────────────────────────────────┘
              ▲      ▲       ▲            Input may not
              │      │       │             be sorted!
      ┌───────┘      │       └───────┐
      │              │               │
      │              │               │
┌───────────┐  ┌───────────┐   ┌───────────┐
│           │  │           │   │           │
│ batch A1  │  │ batch A3  │   │ batch B3  │
│           │  │           │   │           │
├───────────┤  ├───────────┤   ├───────────┤
│           │  │           │   │           │
│ batch B2  │  │ batch B1  │   │ batch A2  │
│           │  │           │   │           │
└───────────┘  └───────────┘   └───────────┘
      ▲              ▲               ▲
      │              │               │
      └─────────┐    │    ┌──────────┘
                │    │    │                  Outputs
                │    │    │                batches are
    ┌─────────────────────────────────┐   repartitioned
    │       RepartitionExec(3)        │    and may not
    │           RoundRobin            │   remain sorted
    │                                 │
    └─────────────────────────────────┘
                ▲         ▲
                │         │                Inputs are
          ┌─────┘         └─────┐            sorted
          │                     │
          │                     │
          │                     │
    ┌───────────┐         ┌───────────┐
    │           │         │           │
    │ batch A1  │         │ batch B1  │
    │           │         │           │
    ├───────────┤         ├───────────┤
    │           │         │           │
    │ batch A2  │         │ batch B2  │
    │           │         │           │
    ├───────────┤         ├───────────┤
    │           │         │           │
    │ batch A3  │         │ batch B3  │
    │           │         │           │
    └───────────┘         └───────────┘


     Sorted Input          Sorted Input
           A                     B

The streams need to remain the way they were

┌─────────────────────────────────┐
│                                 │
│     SortPreservingMergeExec     │
│                                 │
└─────────────────────────────────┘
            ▲         ▲
            │         │         Inputs are
      ┌─────┘         └─────┐   sorted, as
      │                     │    required
      │                     │
      │                     │
┌───────────┐         ┌───────────┐
│           │         │           │
│ batch A1  │         │ batch B1  │
│           │         │           │
├───────────┤         ├───────────┤
│           │         │           │
│ batch A2  │         │ batch B2  │
│           │         │           │
├───────────┤         ├───────────┤
│           │         │           │
│ batch A3  │         │ batch B3  │
│           │         │           │
└───────────┘         └───────────┘


 Sorted Input          Sorted Input
       A                     B

What changes are included in this PR?

  1. Add several "metadata" functions to ExecutionPlan that describe its sortedness and the invariants required for its input
  2. Teach the repartitioning optimizer pass to respect the invariants

Are there any user-facing changes?

Yes: All ExecutionPlans are now required to implement output_ordering as described by @andygrove here #424 (comment)

The rationale for not providing a default implementation (None) was to force anyone who impl ExecutionPlan to think about sort orders. If they do not (very!) subtle bugs are possible as DataFusion starts to rely more on sortedness for optimizations

cc @tustvold @Dandandan

@github-actions github-actions bot added the datafusion Changes in the datafusion crate label Feb 7, 2022
fn optimize_partitions(
target_partitions: usize,
plan: Arc<dyn ExecutionPlan>,
should_repartition: bool,
can_reorder: bool,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the change to the repartition logic to not repartition if it would produce incorrect answers

#[test]
fn added_repartition_to_single_partition() -> Result<()> {
let optimizer = Repartition {};
let plan = hash_aggregate(parquet_exec());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cleaned up the tests here to reduce the ceremony of invoking the optimizer. The plans are all the same

Ok(())
}

#[test]
fn repartition_ignores_limit() -> Result<()> {
let optimizer = Repartition {};
fn repartition_unsorted_limit() -> Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new plans showing that data isn't repartitioned below limits if sorts are present

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests are 👌

"GlobalLimitExec: limit=100",
"LocalLimitExec: limit=100",
"FilterExec: c1@0",
// data is sorted so can't repartition here even though
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, once you put a sort here then repartitioning can't happen without potentially getting wrong results

@@ -82,6 +83,10 @@ impl ExecutionPlan for AnalyzeExec {
Partitioning::UnknownPartitioning(1)
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having to sprinkle output_ordering around was annoying -- but I think it may be worth it to try and avoid some nasty bugs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, makes sense to be explicit

@@ -300,11 +335,6 @@ impl ExecutionPlan for LocalLimitExec {
_ => Statistics::default(),
}
}

fn should_repartition_children(&self) -> bool {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is effectively renamed to benefits_from_input_partitioning

@@ -147,24 +147,59 @@ pub trait ExecutionPlan: Debug + Send + Sync {
Distribution::UnspecifiedDistribution
}

/// Returns `true` if the direct children of this `ExecutionPlan` should be repartitioned
/// to introduce greater concurrency to the plan
/// Returns `true` if this operator relies on its inputs being
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the new API for ExecutionPlan that signal how / when repartitioning occurs

@alamb alamb changed the title Add `output_ ExecutionPlan reports on sortedness and repartitioning optimizer pass respect the invariants Feb 7, 2022
@alamb alamb added the api change Changes the API exposed to users of the crate label Feb 7, 2022
@alamb alamb changed the title ExecutionPlan reports on sortedness and repartitioning optimizer pass respect the invariants Update ExecutionPlan reports on sortedness and repartitioning optimizer pass respect the invariants Feb 7, 2022
@alamb alamb changed the title Update ExecutionPlan reports on sortedness and repartitioning optimizer pass respect the invariants Update ExecutionPlan to know about sortedness and repartitioning optimizer pass respect the invariants Feb 7, 2022
@alamb alamb marked this pull request as ready for review February 7, 2022 22:01
@@ -114,6 +115,14 @@ impl ExecutionPlan for WindowAggExec {
self.input.output_partitioning()
}

fn maintains_input_order(&self) -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this also have relies_on_input_order?

Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've spent a depressingly long time staring at this, and I think it is correct - nice work 👍.

However, I am a little bit uncertain about output_ordering. My understanding is it is present to allow repartitioning of branches with order-sensitive operators, such as limit, but no explicit order.

I worry that this will lead two classes of hard to track down bugs:

  1. ExecutionPlan that incorrectly report None for output_ordering
  2. Plans that make assumptions about ordering without encoding this into Datafusion

An example of 2. might be a plan that scans a sorted file, without the file itself exposing to DataFusion that it is sorted.

I guess I just wonder if this is really worth the potential headaches 😅

datafusion/src/physical_plan/mod.rs Outdated Show resolved Hide resolved
@@ -232,6 +249,24 @@ impl ExecutionPlan for LocalLimitExec {
self.input.output_partitioning()
}

fn relies_on_input_order(&self) -> bool {
self.input.output_ordering().is_some()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like an optimization that really belongs in the Repartition optimizer, namely that if the children of a plan don't have a sort order, you can freely repartition them even if the parent relies_on_input_order.

(false, false) => {
// `plan` may reorder the input itself, so no need
// to preserve the order of any children
true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this has lost the requires_single_partition case, that being said I'm not sure why this matters? A CoalesceBatches will just be inserted? Perhaps would_benefit should be set to false if this requires a single partition, as this won't propagate beyond the direct children? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interestingly when I add the requres_single_partition case here I get failures with the tests on

[
    "SortPreservingMergeExec: [c1@0 ASC]",
    "SortExec: [c1@0 ASC]",
    "ProjectionExec: expr=[c1@0 as c1]",
    "RepartitionExec: partitioning=RoundRobinBatch(10)",
    "ParquetExec: limit=None, partitions=[x]",
]
actual:

[
    "SortPreservingMergeExec: [c1@0 ASC]",
    "SortExec: [c1@0 ASC]",
    "ProjectionExec: expr=[c1@0 as c1]",
    "ParquetExec: limit=None, partitions=[x]",
]

Aka repartitioning doesn't two levels down.

So rather than intermix the "should we bother repartitioning" with the "would it produce wrong answers" I simply removed the check for the required input partitioning and it is now included in the default "benefits from repartitioning check"

/// can not be reordered as as something upstream is relying on that order
///
/// If 'would_benefit` is false, the upstream operator doesn't
/// benefit from additional reordering
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// benefit from additional reordering
/// benefit from additional partitioning

/// The default implementation returns `true`
fn benefits_from_input_partitioning(&self) -> bool {
// give me MOAR CPUs
true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you remove the required_child_distribution?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to separate the notions of correctness from possible optimizations; However when I type out the rationale it doesn't really hold up; I will put it back.

@@ -36,33 +128,70 @@ impl Repartition {
}
}

/// Recursively visits all `plan`s puts and then optionally adds a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own understanding I'm going to write out what this does.

It does a depth first scan of the tree, and repartitions any plan that:

  • Has less than the desired number of partitions
  • Has a direct parent that benefits_from_input_partitioning
  • Does not have a parent that relies_on_input_order unless there is an intervening node that does not maintain_input_order

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has a direct parent that benefits_from_input_partitioning

I think this is Has any parent that benefits_from_input_partitioning` unless there is an intervening node

otherwise yes. I will add this summary as a comment. Thank you

Ok(())
}

#[test]
fn repartition_ignores_limit() -> Result<()> {
let optimizer = Repartition {};
fn repartition_unsorted_limit() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests are 👌

/// such as automatically repartitioning correctly.
///
/// The default implementation returns `false`
fn maintains_input_order(&self) -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent a long time trying to understand why there is both this and output_ordering and it is because this indicates if the operator preserves the order, not if that order is actually sorted 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I will make this clearer in the comments

Copy link
Member

@xudong963 xudong963 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have carefully looked at this PR and its related issues. I think some related history issues can be solved in the ticket. BTW, the test is solid! 👍 @alamb

fn optimize_partitions(
target_partitions: usize,
plan: Arc<dyn ExecutionPlan>,
should_repartition: bool,
can_reorder: bool,
would_benefit: bool,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not very understand the variable, If 'would_benefit` is false, the upstream operator doesn't benefit from additional reordering, but wouldn't produce wrong results? So it's ok to repartition to benefit from high parallelism? If so, I think the variable is needless.

I noticed the annotation of the benefits_from_input_partitioning function 👍, the variable makes sense to me.

datafusion/src/physical_optimizer/repartition.rs Outdated Show resolved Hide resolved
/// parallelism may outweigh any benefits
///
/// The default implementation returns `true`
fn benefits_from_input_partitioning(&self) -> bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed the return value of sort, limit, union is false, so I want to know how to decide the result? In other words, how to decide the overhead of extra parallelism may outweigh any benefits? Is this an empirical estimate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it defaults to true, actually.

Perhaps you mean why do sort limit and union override the default maintains_input_order? If so the reason is that I know how they are implemented. The code on master is making the same assumption, FWIW, but after this PR the assumption is explicit

    fn maintains_input_order(&self) -> bool {
        // tell optimizer this operator doesn't reorder its input
        true
    }

datafusion/src/physical_optimizer/repartition.rs Outdated Show resolved Hide resolved
alamb and others added 2 commits February 8, 2022 11:56
Co-authored-by: xudong.w <wxd963996380@gmail.com>
Co-authored-by: xudong.w <wxd963996380@gmail.com>
@alamb
Copy link
Contributor Author

alamb commented Feb 8, 2022

However, I am a little bit uncertain about output_ordering. My understanding is it is present to allow repartitioning of branches with order-sensitive operators, such as limit, but no explicit order.

I think that is correct. The specific case that output_order is required at the moment to get correct is distinguishing between

Limit
Filter
Scan

And

Limit
Sort
Scan

I worry that this will lead two classes of hard to track down bugs:

  1. ExecutionPlan that incorrectly report None for output_ordering
  2. Plans that make assumptions about ordering without encoding this into Datafusion

yes, I think these are indeed two classes of hard to track down bugs that can/will occur if DataFusion starts optimizing based on sort orders. (cc @NGA-TRAN). One might argue that we already have one example of such a bug in #423 😆 . I will add some more comments to try and make it harder to forget.

I guess I just wonder if this is really worth the potential headaches 😅

Well the real question is what is the alternative 🤔 Some thoughts are:

  1. Be conservative for operators like Limit and simply don't repartition / do anything to their inputs
  2. I could also special case Limit (for example look for a SortExec or SortPreservingMerge anywhere below it)

@alamb
Copy link
Contributor Author

alamb commented Feb 8, 2022

I also see output_ordering as the foundation for more sophisticated transformations such as avoiding sorts when the input data is already sorted (e.g. because the parquet file was already sorted, for example, or because IOx sorted the data to deduplicate it)

@alamb
Copy link
Contributor Author

alamb commented Feb 8, 2022

@tustvold and @xudong963 I think I have addressed all of your comments.

@@ -278,6 +279,14 @@ impl ExecutionPlan for HashJoinExec {
self.right.output_partitioning()
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably doesn't make sense to address this now, but order of the right side might be (fully or partially) maintained for hash joins.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To give a concrete example, if the right side of the join is sorted on field x and we use an inner join, the output is sorted on x too as rows are not reordered.

Copy link
Contributor Author

@alamb alamb Feb 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 makes sense.

I think that only applies for inner joins though (as some types of outer joins may stick nulls into inconvenient places 😆 )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes 😂 I think right join is the other one that maintains sortedness too.

@alamb
Copy link
Contributor Author

alamb commented Feb 8, 2022

Something else I have been musing about is how to handle knowledge that the data is sorted only after a partition is executed.

For example, let's say in some future world, that when GroupByHash spills to disk it will produce the output in sorted group key order. If this is then fed into a Sort then at runtime if the GroupByHash spills the sort could simply merge its input partitions rather than having to actually sort them.

🤔

@alamb
Copy link
Contributor Author

alamb commented Feb 9, 2022

Thanks to everyone who took a look at this 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api change Changes the API exposed to users of the crate datafusion Changes in the datafusion crate
Projects
None yet
4 participants