-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
*: support to execute CTE on MPP side #42296
Changes from 34 commits
3f91dd3
95375d7
e50927c
6285fb0
5584c9a
6044c08
6c69b8a
d6c27bf
9133d92
2dc1de9
6c81152
380b4d1
e4010a8
699e39d
521d66d
0b5244c
3f2418a
671ef17
863890e
7e918fc
2341c88
fa4ecaf
a924eef
f5d4303
4657c67
1c8994f
e5879cd
af5c066
98868c0
84e8ac8
9004ef0
9088ad4
06f4b3b
f7026cd
51a1178
34864c8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,10 +16,12 @@ package executor | |
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/pingcap/errors" | ||
"github.com/pingcap/failpoint" | ||
"github.com/pingcap/kvproto/pkg/mpp" | ||
"github.com/pingcap/tidb/distsql" | ||
"github.com/pingcap/tidb/infoschema" | ||
"github.com/pingcap/tidb/kv" | ||
|
@@ -99,6 +101,10 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error { | |
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
err = e.fixTaskForCTEStorageAndReader(dagReq.RootExecutor, mppTask.Meta) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why should we align the address after pb is generated There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's because each task of the CTE producer/consumer needs different upstream/downstream compared with the ones in the same fragment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider that there're two producer tasks A,B. You know that A and B's upstream consumer task is different. |
||
if err != nil { | ||
return err | ||
} | ||
pbData, err := dagReq.Marshal() | ||
if err != nil { | ||
return errors.Trace(err) | ||
|
@@ -127,6 +133,88 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error { | |
return nil | ||
} | ||
|
||
// fixTaskForCTEStorageAndReader fixes the upstream/downstream tasks for the producers and consumers. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please write some meaningful comments. |
||
// After we split the fragments. A CTE producer in the fragment will holds all the task address of the consumers. | ||
// For example, the producer has two task on node_1 and node_2. As we know that each consumer also has two task on the same nodes(node_1 and node_2) | ||
// We need to prune address of node_2 for producer's task on node_1 since we just want the producer task on the node_1 only send to the consumer tasks on the node_1. | ||
// And the same for the task on the node_2. | ||
// And the same for the consumer task. We need to prune the unnecessary task address of its producer tasks(i.e. the downstream tasks). | ||
func (e *MPPGather) fixTaskForCTEStorageAndReader(exec *tipb.Executor, meta kv.MPPTaskMeta) error { | ||
children := make([]*tipb.Executor, 0, 2) | ||
switch exec.Tp { | ||
case tipb.ExecType_TypeTableScan, tipb.ExecType_TypePartitionTableScan, tipb.ExecType_TypeIndexScan: | ||
case tipb.ExecType_TypeSelection: | ||
children = append(children, exec.Selection.Child) | ||
case tipb.ExecType_TypeAggregation, tipb.ExecType_TypeStreamAgg: | ||
children = append(children, exec.Aggregation.Child) | ||
case tipb.ExecType_TypeTopN: | ||
children = append(children, exec.TopN.Child) | ||
case tipb.ExecType_TypeLimit: | ||
children = append(children, exec.Limit.Child) | ||
case tipb.ExecType_TypeExchangeSender: | ||
children = append(children, exec.ExchangeSender.Child) | ||
if len(exec.ExchangeSender.UpstreamCteTaskMeta) == 0 { | ||
break | ||
} | ||
actualUpStreamTasks := make([][]byte, 0, len(exec.ExchangeSender.UpstreamCteTaskMeta)) | ||
actualTIDs := make([]int64, 0, len(exec.ExchangeSender.UpstreamCteTaskMeta)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not used variable There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's used for the debug log. |
||
for _, tasksFromOneConsumer := range exec.ExchangeSender.UpstreamCteTaskMeta { | ||
for _, taskBytes := range tasksFromOneConsumer.EncodedTasks { | ||
taskMeta := &mpp.TaskMeta{} | ||
err := taskMeta.Unmarshal(taskBytes) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Really strange and complex code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, but it's difficult to change since we split the tasks of one fragment just out here. |
||
if err != nil { | ||
return err | ||
} | ||
if taskMeta.Address != meta.GetAddress() { | ||
continue | ||
} | ||
actualUpStreamTasks = append(actualUpStreamTasks, taskBytes) | ||
actualTIDs = append(actualTIDs, taskMeta.TaskId) | ||
} | ||
} | ||
logutil.BgLogger().Warn("refine tunnel for cte producer task", zap.String("the final tunnel", fmt.Sprintf("up stream consumer tasks: %v", actualTIDs))) | ||
exec.ExchangeSender.EncodedTaskMeta = actualUpStreamTasks | ||
case tipb.ExecType_TypeExchangeReceiver: | ||
if len(exec.ExchangeReceiver.OriginalCtePrdocuerTaskMeta) == 0 { | ||
break | ||
} | ||
exec.ExchangeReceiver.EncodedTaskMeta = [][]byte{} | ||
actualTIDs := make([]int64, 0, 4) | ||
for _, taskBytes := range exec.ExchangeReceiver.OriginalCtePrdocuerTaskMeta { | ||
taskMeta := &mpp.TaskMeta{} | ||
err := taskMeta.Unmarshal(taskBytes) | ||
if err != nil { | ||
return err | ||
} | ||
if taskMeta.Address != meta.GetAddress() { | ||
continue | ||
} | ||
exec.ExchangeReceiver.EncodedTaskMeta = append(exec.ExchangeReceiver.EncodedTaskMeta, taskBytes) | ||
actualTIDs = append(actualTIDs, taskMeta.TaskId) | ||
} | ||
logutil.BgLogger().Warn("refine tunnel for cte consumer task", zap.String("the final tunnel", fmt.Sprintf("down stream producer task: %v", actualTIDs))) | ||
case tipb.ExecType_TypeJoin: | ||
children = append(children, exec.Join.Children...) | ||
case tipb.ExecType_TypeProjection: | ||
children = append(children, exec.Projection.Child) | ||
case tipb.ExecType_TypeWindow: | ||
children = append(children, exec.Window.Child) | ||
case tipb.ExecType_TypeSort: | ||
children = append(children, exec.Sort.Child) | ||
case tipb.ExecType_TypeExpand: | ||
children = append(children, exec.Expand.Child) | ||
default: | ||
return errors.Errorf("unknown new tipb protocol %d", exec.Tp) | ||
} | ||
for _, child := range children { | ||
err := e.fixTaskForCTEStorageAndReader(child, meta) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func collectPlanIDS(plan plannercore.PhysicalPlan, ids []int) []int { | ||
ids = append(ids, plan.ID()) | ||
for _, child := range plan.Children() { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why change the old here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's emmm...
I debugged a lot of time but could see the reason why the original codes panicked. But passing -1 will always work.