Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner/core: support union all for mpp. #24287

Merged
merged 28 commits into from
Jun 2, 2021

Conversation

hanfei1991
Copy link
Member

What problem does this PR solve?

Problem Summary:
Support union all operator push down

What is changed and how it works?

Proposal: proposal of union all

What's Changed:
multiplying tasks during buidling mpp tasks.

Related changes

Check List

Tests

  • Unit test
  • Manual test (add detailed scripts or steps below)

Release note

  • planner/core: support union all for mpp.

@hanfei1991 hanfei1991 requested review from a team as code owners April 26, 2021 07:13
@hanfei1991 hanfei1991 requested review from XuHuaiyu and removed request for a team April 26, 2021 07:13
@ti-chi-bot ti-chi-bot added the size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. label Apr 26, 2021
}
ua := PhysicalUnionAll{}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset, chReqProps...)
ua := PhysicalUnionAll{
mpp: prop.TaskTp == property.MppTaskType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be canUseMpp && prop.TaskTp == property.MppTaskType?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, I will fix it

}
*forest = append(*forest, p.(*PhysicalExchangeSender))
for i := 1; i < len(stack); i++ {
if _, ok := stack[i].(*PhysicalUnionAll); ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to adjust the columns below and above this removed UnionAll to keep same UniqueID? or replace this UnionAll with a Projection?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's unnecessary, because TiFlash won't use UniqueID.

Comment on lines 2505 to 2529
if canUseMpp && prop.TaskTp == property.MppTaskType {
chReqProps = append(chReqProps, &property.PhysicalProperty{
ExpectedCnt: prop.ExpectedCnt,
TaskTp: property.MppTaskType,
})
} else {
chReqProps = append(chReqProps, &property.PhysicalProperty{ExpectedCnt: prop.ExpectedCnt})
}
}
ua := PhysicalUnionAll{}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset, chReqProps...)
ua := PhysicalUnionAll{
mpp: prop.TaskTp == property.MppTaskType,
}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset, chReqProps...)
ua.SetSchema(p.Schema())
if canUseMpp && prop.TaskTp == property.RootTaskType {
chReqProps = make([]*property.PhysicalProperty, 0, len(p.children))
for range p.children {
chReqProps = append(chReqProps, &property.PhysicalProperty{
ExpectedCnt: prop.ExpectedCnt,
TaskTp: property.MppTaskType,
})
}
mppUA := PhysicalUnionAll{mpp: true}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset, chReqProps...)
mppUA.SetSchema(p.Schema())
return []PhysicalPlan{ua, mppUA}, true
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is better to clear which case will generate mpp task, and which cases will generate root task. This will avoid some errors like pointed above by kenan.


if(canUseMpp && (prop.TaskTp == property.MppTaskType|| prop.TaskTp == property.RootTaskType)) 
{
    generate mpp task
}
if (!(canUseMpp && prop.TaskTp == property.MppTaskType))
{
generate root task
}
...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little troublesome and confusing by using a negative condition (meaning condition with "!").
Although it aligns code beautifully, I cannot figure out the meanings clearly by only reading the conditions.

s.Fragment = f
err := f.init(s)
return f, errors.Trace(err)
func flattenPlanForUnionAll(stack []PhysicalPlan, forest *[]*PhysicalExchangeSender) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls add some comments

func (p *PhysicalUnionAll) attach2Task(tasks ...task) task {
if _, ok := tasks[0].(*mppTask); ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even if their children are mpp tasks, the unionAll may also return root task

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in what case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in TiDBMPP mode, union all is at tidb, then its children run in MPP.

srcCol := ch.Schema().Columns[i]
srcType := srcCol.RetType
if !srcType.Equal(dstType) || !(mysql.HasNotNullFlag(dstType.Flag) == mysql.HasNotNullFlag(srcType.Flag)) {
exprs[i] = expression.BuildCastFunction4Union(un.ctx, srcCol, dstType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should do cast like in the join, such as decima wich different precision and (uint8 = int 32), besides, set the null property for these cast functions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When comparing types, It will take "length" and "decimal" of types into consideration.

p = ch
}
case *PhysicalHashJoin:
stack = append(stack, x.children[1-x.InnerChildIdx])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. x.children[1-x.InnerChildIdx] means probe side?
  2. what about other side where union may also occur?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. yes
  2. the other side can never have a union-all.

Comment on lines +448 to +468
tk.MustQuery("select x1.a, x.a from x1 left join (select x2.b a, x1.b from x1 join x2 on x1.a = x2.b union all select * from x1 ) x on x1.a = x.a order by x1.a").Check(testkit.Rows("1 1", "1 1", "2 2", "2 2", "3 3", "3 3", "4 4", "4 4"))
tk.MustQuery("select x1.a, x.a from x1 left join (select count(*) a, sum(b) b from x1 group by a union all select * from x2 ) x on x1.a = x.a order by x1.a").Check(testkit.Rows("1 1", "1 1", "1 1", "1 1", "2 2", "3 3", "4 4"))

tk.MustExec("drop table if exists x3")
tk.MustExec("create table x3(a int , b int);")
tk.MustExec("alter table x3 set tiflash replica 1")
tb = testGetTableByName(c, tk.Se, "test", "x3")
err = domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)

tk.MustExec("insert into x3 values (2, 2), (2, 3), (2, 4)")
// test nested union all
tk.MustQuery("select count(*) from (select a, b from x1 union all select a, b from x3 union all (select x1.a, x3.b from (select * from x3 union all select * from x2) x3 left join x1 on x3.a = x1.b))").Check(testkit.Rows("14"))
// test union all join union all
tk.MustQuery("select count(*) from (select * from x1 union all select * from x2 union all select * from x3) x join (select * from x1 union all select * from x2 union all select * from x3) y on x.a = y.b").Check(testkit.Rows("29"))
tk.MustExec("set @@session.tidb_broadcast_join_threshold_count=100000")
failpoint.Enable("github.com/pingcap/tidb/executor/checkTotalMPPTasks", `return(6)`)
tk.MustQuery("select count(*) from (select * from x1 union all select * from x2 union all select * from x3) x join (select * from x1 union all select * from x2 union all select * from x3) y on x.a = y.b").Check(testkit.Rows("29"))
failpoint.Disable("github.com/pingcap/tidb/executor/checkTotalMPPTasks")

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. please also check plans
  2. genereate more complex queries, like agg, agg+join, partition key is a subset of join/group by keys.
  3. use randgen to test more complex cases.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have checked plan results in plan tests. I agree with adding more tests but It needs time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can the final plan be printed? as it is important to debug according to explain [analyze].

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plan that has eliminated union-all can not be showed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can it support explain analyze now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not yet

return nil, true
}
canUseMpp := p.ctx.GetSessionVars().AllowMPPExecution && p.canPushToCop(kv.TiFlash)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function does not deliver the partition properity, which introduce more network overhead during exchange.
It is better to deliver the partition properity, then the unionAll becomes a exchange reciver, and set different exchange sender such as pashthrough or partition. for example:
group by R.a
|
(group by S.a, S.b union scan C) as R

if use the current code, the plan becomes:

group by R.a
|
exchange receiver
|----exchange sender1[partition by S.a] - group by S.a, S.b[partition by S.a,S.B]
|----exchange sender2[partition by C.a] - Scan C

the optimal one is:

group by R.a
|
exchange receiver
|----exchange sender1[pashthrough] - group by S.a, S.b[partition by S.a]
|----exchange sender2[partition by C.a] - Scan C

here exchange sender1 will send to local node and reduce network IO.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's too complicated and only is beneficial in a few situations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eliminating shuffle is an important optimization for distributed plans. If the users' sql is slow due to more unnecessary exchange, how to support this case in the future?
besides, it is not too complicated, its main idea is:

  1. delivery data distribution properity to its child,
  2. push up unionAll until the data distribution properity does not satisfy needs, then
  3. enforce exchange.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chould you detail the pass through exchanger? How could we granrantee the same task number between source tasks and target tasks?

planner/core/fragment.go Outdated Show resolved Hide resolved
}
return plan
}

func injectProjBelowUnion(un *PhysicalUnionAll) *PhysicalUnionAll {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary? In the logical plan building stage, field cast projection has already been appended.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is! The check in logical plan is looser, such as lack of "NotNull" flag check.

Co-authored-by: Zhuomin Liu <lzmhhh123@gmail.com>
@ti-chi-bot
Copy link
Member

@eurekaka: /merge is only allowed for the committers, you can assign this pull request to the committer in list by filling /assign @committer in the comment to help merge this pull request.

In response to this:

/merge

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository.

@eurekaka eurekaka added the sig/planner SIG: Planner label Jun 1, 2021
@eurekaka
Copy link
Contributor

eurekaka commented Jun 1, 2021

/lgtm

@ti-chi-bot
Copy link
Member

@eurekaka: Please use GitHub review feature instead of /lgtm [cancel] when you want to submit review to the pull request.
For how to use GitHub review feature, see also this document provided by GitHub.

For the reason we drop support to the commands, see also this page.
This reply is being used as a temporary reply during the migration of review process and will be removed on July 1st.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository.

@ti-chi-bot
Copy link
Member

[REVIEW NOTIFICATION]

This pull request has been approved by:

  • eurekaka
  • lzmhhh123

To complete the pull request process, please ask the reviewers in the list to review by filling /cc @reviewer in the comment.
After your PR has acquired the required number of LGTMs, you can assign this pull request to the committer in the list by filling /assign @committer in the comment to help you merge this pull request.

The full list of commands accepted by this bot can be found here.

Reviewer can indicate their review by submitting an approval review.
Reviewer can cancel approval by submitting a request changes review.

@ti-chi-bot ti-chi-bot added status/LGT2 Indicates that a PR has LGTM 2. and removed status/LGT1 Indicates that a PR has LGTM 1. labels Jun 1, 2021
@eurekaka
Copy link
Contributor

eurekaka commented Jun 1, 2021

/merge

@ti-chi-bot
Copy link
Member

This pull request has been accepted and is ready to merge.

Commit hash: 9a7b2a7

@ti-chi-bot ti-chi-bot added the status/can-merge Indicates a PR has been approved by a committer. label Jun 1, 2021
// after untwist, there will be two plans in `forest` slice:
// - ExchangeSender -> Projection (c1) -> TableScan(t)
// - ExchangeSender -> Projection (c2) -> TableScan(s)
func untwistPlanAndRemoveUnionAll(stack []PhysicalPlan, forest *[]*PhysicalExchangeSender) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this change be seen by explain?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this can only be seen in the log.

@hanfei1991
Copy link
Member Author

/run-all-tests

@ti-chi-bot ti-chi-bot merged commit 52e89cb into pingcap:master Jun 2, 2021
ti-srebot pushed a commit to ti-srebot/tidb that referenced this pull request Jun 2, 2021
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
@ti-srebot
Copy link
Contributor

cherry pick to release-5.0 in PR #25051

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component/expression needs-cherry-pick-release-5.0 sig/execution SIG execution sig/planner SIG: Planner size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. status/can-merge Indicates a PR has been approved by a committer. status/LGT2 Indicates that a PR has LGTM 2.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants