-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
*: support to execute CTE on MPP side #42296
Conversation
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
Skipping CI for Draft Pull Request. |
ec4d11d
to
699e39d
Compare
recorededPlanIDs[r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl. | ||
RecordOneCopTask(planID, r.storeType.Name(), callee, detail)] = 0 | ||
RecordOneCopTask(-1, r.storeType.Name(), callee, detail)] = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why change the old here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's emmm...
I debugged a lot of time but could see the reason why the original codes panicked. But passing -1 will always work.
for _, task := range tasks { | ||
addr := task.Meta.GetAddress() | ||
// for upper fragment, the task num is equal to address num covered by lower tasks | ||
_, ok := addressMap[addr] | ||
if _, okk := cteAddrMap[addr]; !okk && len(cteAddrMap) > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how to understand this?if a children task‘s address is not in the cteProducerAddrs and cteProducerAddrs is not empty, then skip this task?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The possible workers are decided from bottom to top. So the address appears in the child fragments must appear in the parent fragments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still confused here, take case:
3 tiflash node: A,B,C
one cte producer is from A,B
when shared-cte as one side of join, should left the base table side un-moved. another side is broadcasted.
join
+-- base table (un-moved)
+-- receiver2 (cte task from A,B)
Soga: so your code here is meaning that: Consuming-cte means the current OP is join or something, we can always let one side un-moved, just let cte to be as close to the data as possible!?
After reading the design again, meaning the align the worker here, make sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the current way is easy to implement. But it doesn't use the full nodes of our MPP.
We can support a more enhanced n:m sending strategy to ensure that most data is computed at the local node while we can use the full nodes of our MPP. But it's not contained in this pr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add some test cases at least to show the new execution plan.
go.mod
Outdated
@@ -281,5 +281,6 @@ replace ( | |||
// fix potential security issue(CVE-2020-26160) introduced by indirect dependency. | |||
github.com/dgrijalva/jwt-go => github.com/form3tech-oss/jwt-go v3.2.6-0.20210809144907-32ab6a8243d7+incompatible | |||
github.com/pingcap/tidb/parser => ./parser | |||
github.com/pingcap/tipb => github.com/pingcap/tipb v0.0.0-20230328072712-dd18a6bb40f1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why modify here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can view the changed in plan_to_pb.go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, why are you using replace
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That side pr hasn't merged yet. Since you have not reviewed the fragment.go.
So the pb might be change due to your review.
planner/core/optimizer.go
Outdated
func DoOptimize(ctx context.Context, sctx sessionctx.Context, flag uint64, logic LogicalPlan) (PhysicalPlan, float64, error) { | ||
_, finalPlan, cost, err := DoOptimizeAndLogicAsRet(ctx, sctx, flag, logic) | ||
return finalPlan, cost, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a DoOptimize
and a DoOptimizeAndLogicAsRet
?
I think DoOptimizeAndLogicAsRet
can do all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Many tests used the DoOptimize
, the two will reduce the unnecessary changes of this pr.:joy:
planner/core/find_best_task.go
Outdated
@@ -478,6 +587,107 @@ END: | |||
return bestTask, cntPlan, nil | |||
} | |||
|
|||
// findBestTask implements LogicalPlan interface. | |||
func (p *LogicalSequence) findBestTask(prop *property.PhysicalProperty, planCounter *PlanCounterTp, opt *physicalOptimizeOp) (bestTask task, cntPlan int64, err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the difference between (p *baseLogicalPlan) findBestTask
and (p *LogicalSequence) findBestTask
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seem same with baseLogicalPlan.findBestTask
+1
@@ -133,6 +133,8 @@ const ( | |||
TypeForeignKeyCheck = "Foreign_Key_Check" | |||
// TypeForeignKeyCascade is the type of FKCascade | |||
TypeForeignKeyCascade = "Foreign_Key_Cascade" | |||
// TypeSequence | |||
TypeSequence = "Sequence" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make sure the new execution plan can be displayed correctly in slow log, stmt summary, dashboard...
planner/core/find_best_task.go
Outdated
@@ -289,6 +289,115 @@ func (p *baseLogicalPlan) enumeratePhysicalPlans4Task(physicalPlans []PhysicalPl | |||
return bestTask, cntPlan, nil | |||
} | |||
|
|||
func (p *LogicalSequence) enumeratePhysicalPlans4Task(physicalPlans []PhysicalPlan, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(p *LogicalSequence) enumeratePhysicalPlans4Task
and (p *baseLogicalPlan) enumeratePhysicalPlans4Task
is 80% the same, I think it's better not to copy it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But is there a good way to merge them? I haven't come up with one. So the codes are like current.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can just check if p.self
is a LogicalSequence
in (p *baseLogicalPlan) enumeratePhysicalPlans4Task
.
type cteConsumerStatus int | ||
|
||
const ( | ||
NoCTE cteConsumerStatus = iota | ||
SomeCTEFailedMpp | ||
AllCTECanMpp | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think
- the name should be "producer" status instead of "consumer" status.
- two values are enough.
planner/core/find_best_task.go
Outdated
@@ -478,6 +587,107 @@ END: | |||
return bestTask, cntPlan, nil | |||
} | |||
|
|||
// findBestTask implements LogicalPlan interface. | |||
func (p *LogicalSequence) findBestTask(prop *property.PhysicalProperty, planCounter *PlanCounterTp, opt *physicalOptimizeOp) (bestTask task, cntPlan int64, err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seem same with baseLogicalPlan.findBestTask
+1
func (p *LogicalCTE) findBestTask(prop *property.PhysicalProperty, _ *PlanCounterTp, _ *physicalOptimizeOp) (t task, cntPlan int64, err error) { | ||
func (p *LogicalCTE) findBestTask(prop *property.PhysicalProperty, counter *PlanCounterTp, pop *physicalOptimizeOp) (t task, cntPlan int64, err error) { | ||
if len(p.children) > 0 { | ||
return p.baseLogicalPlan.findBestTask(prop, counter, pop) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when should we set the children for LogicalCTE(child is field that it already has,we didn't utilize it before?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, we use whether it has children
to identify whether is producer or consumer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it, make sense, better comment on it above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand it correctly, "CTE storage" and "CTE producer" are the same thing, and "CTE reader" and "CTE consumer" are the same thing.
I think unifying the naming is better for understanding.
planner/core/fragment.go
Outdated
return tasks, nil | ||
} | ||
|
||
// flipCTEReader fix the plan tree. Before we enter the func. The plan tree is like ParentPlan->CTEConsumer->ExchangeReceiver. | ||
// The CTEConsumer has no real meaning in MPP's execution. We prune it to make the plan become ParentPlan->ExchangeReceiver. | ||
// But the Recevier needs a schema since itself doesn't hold the schema. So the final plan become ParentPlan->ExchangeRecevier->CTEConsumer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo of "receive"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the result is just "ParentPlan->ExchangeRecevier".
I didn't find where do we put the PhysicalCTE
under the PhysicalExchangeReceiver
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's in generateTasksForCTEReader
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make it more clear in the comments then.
I want to do that in the next pr. |
/review default |
@winoros: In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
/merge |
This pull request has been accepted and is ready to merge. Commit hash: 34864c8
|
What problem does this PR solve?
Issue Number: close #43333
Problem Summary:
This pull intends to support the CTE on MPP side.
There's a detailed design in this doc.
What is changed and how it works?
You can refer the detailed design mentioned above to see how the codes work.
Check List
Tests
Side effects
Documentation
Release note
Please refer to Release Notes Language Style Guide to write a quality release note.