Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moving PipelineFixer above all rules to use ExecutionPlan APIs #5880

Merged
merged 15 commits into from
Apr 6, 2023
Merged

Moving PipelineFixer above all rules to use ExecutionPlan APIs #5880

merged 15 commits into from
Apr 6, 2023

Conversation

metesynnada
Copy link
Contributor

@metesynnada metesynnada commented Apr 5, 2023

Which issue does this PR close?

Closes #5878.

Rationale for this change

Since some strongly dependent optimizer rules affect each other, rule ordering becomes more important. PipelineFixer (maybe more rules in the future) can change the ExecutionPlan at a level, and the new ExecutionPlan can have a different set of flags (maybe ordering, distribution, or more).

I suggest making the executor changer rules above the rules that fill the sort, distribution, etc.

If the sources are also sorted, we are looking for keeping the order information without adding additional SortExec. However, current planner results

[
    "SymmetricHashJoinExec: join_type=Full, on=[(Column { name: \"a2\", index: 1 }, Column { name: \"a2\", index: 1 })], filter=BinaryExpr { left: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 0 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Gt, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 1 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Plus, right: Literal { value: Int64(3) } } }, op: And, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 0 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Lt, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 1 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Plus, right: Literal { value: Int64(10) } } } }",
    "  CoalesceBatchesExec: target_batch_size=8192",
    "    RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=8",
    "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
    "        CsvExec: files={1 group: [[private/var/folders/rf/dhj0s83d57l2_m51k2dmd_ch0000gn/T/.tmpcRbDJD/left.csv]]}, has_header=false, limit=None, projection=[a1, a2]",
    "  CoalesceBatchesExec: target_batch_size=8192",
    "    RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=8",
    "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
    "        CsvExec: files={1 group: [[private/var/folders/rf/dhj0s83d57l2_m51k2dmd_ch0000gn/T/.tmpcRbDJD/right.csv]]}, has_header=false, limit=None, projection=[a1, a2]",
]

and, unfortunately, two consecutive RepartitionExec removes the order information. To prevent this, SHJ particularly sets benefits_from_input_partitioning to false, however, it is ineffective since RepartitionExec::RounRobin is added before HashJoin -> SymmetricHashJoin change.

If we move the Repartition rule below the PipelineFixer, we are able to use SymmetricHashJoinExec 's benefits_from_input_partitioning API effectively.

[
    "SymmetricHashJoinExec: join_type=Full, on=[(Column { name: \"a2\", index: 1 }, Column { name: \"a2\", index: 1 })], filter=BinaryExpr { left: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 0 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Gt, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 1 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Plus, right: Literal { value: Int64(3) } } }, op: And, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 0 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Lt, right: BinaryExpr { left: CastExpr { expr: Column { name: \"a1\", index: 1 }, cast_type: Int64, cast_options: CastOptions { safe: false } }, op: Plus, right: Literal { value: Int64(10) } } } }",
    "  CoalesceBatchesExec: target_batch_size=8192",
    "    RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=1",
    "      CsvExec: files={1 group: [[private/var/folders/rf/dhj0s83d57l2_m51k2dmd_ch0000gn/T/.tmpdTwdrk/left.csv]]}, has_header=false, limit=None, projection=[a1, a2]",
    "  CoalesceBatchesExec: target_batch_size=8192",
    "    RepartitionExec: partitioning=Hash([Column { name: \"a2\", index: 1 }], 8), input_partitions=1",
    "      CsvExec: files={1 group: [[private/var/folders/rf/dhj0s83d57l2_m51k2dmd_ch0000gn/T/.tmpdTwdrk/right.csv]]}, has_header=false, limit=None, projection=[a1, a2]",
]

What changes are included in this PR?

Optimizer reorders to use the benefits_from_input_partitioning API. If we call PipelineFixer above (almost) everything, we can leverage the changed executor APIs in the optimizer.

Are these changes tested?

Yes.

Are there any user-facing changes?

No.

@github-actions github-actions bot added the core Core DataFusion crate label Apr 5, 2023
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense to me -- thank you @metesynnada

@alamb alamb merged commit 4f40070 into apache:main Apr 6, 2023
@metesynnada metesynnada deleted the feature/repartition-for-stream branch April 7, 2023 21:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Moving PipelineFixer above all rules to use ExecutionPlan APIs
4 participants