From bc676ea97e4fcf0a56a7e17471cea17885c4bf8c Mon Sep 17 00:00:00 2001 From: Han Fei Date: Wed, 2 Jun 2021 14:15:38 +0800 Subject: [PATCH 1/7] cherry pick #24287 to release-5.0 Signed-off-by: ti-srebot --- executor/mpp_gather.go | 24 ++-- executor/tiflash_test.go | 42 ++++++ expression/column.go | 3 +- planner/core/exhaust_physical_plans.go | 41 +++++- planner/core/fragment.go | 139 ++++++++++++++++--- planner/core/initialize.go | 2 +- planner/core/logical_plan_builder.go | 4 + planner/core/physical_plans.go | 45 +++++- planner/core/plan.go | 3 + planner/core/rule_inject_extra_projection.go | 33 +++++ planner/core/task.go | 24 ++++ store/copr/mpp.go | 1 + 12 files changed, 322 insertions(+), 39 deletions(-) diff --git a/executor/mpp_gather.go b/executor/mpp_gather.go index d34d63be49a79..536a06eda8993 100644 --- a/executor/mpp_gather.go +++ b/executor/mpp_gather.go @@ -50,7 +50,7 @@ type MPPGather struct { respIter distsql.SelectResult } -func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment, tasks []*kv.MPPTask, isRoot bool) error { +func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error { dagReq, _, err := constructDAGReq(e.ctx, []plannercore.PhysicalPlan{pf.ExchangeSender}, kv.TiFlash) if err != nil { return errors.Trace(err) @@ -58,12 +58,12 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment, tasks []*kv.M for i := range pf.ExchangeSender.Schema().Columns { dagReq.OutputOffsets = append(dagReq.OutputOffsets, uint32(i)) } - if !isRoot { + if !pf.IsRoot { dagReq.EncodeType = tipb.EncodeType_TypeCHBlock } else { dagReq.EncodeType = tipb.EncodeType_TypeChunk } - for _, mppTask := range tasks { + for _, mppTask := range pf.ExchangeSender.Tasks { err := updateExecutorTableID(context.Background(), dagReq.RootExecutor, mppTask.TableID, true) if err != nil { return errors.Trace(err) @@ -77,7 +77,7 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment, tasks []*kv.M Data: pbData, Meta: mppTask.Meta, ID: mppTask.ID, - IsRoot: isRoot, + IsRoot: pf.IsRoot, Timeout: 10, SchemaVar: e.is.SchemaMetaVersion(), StartTs: e.startTS, @@ -85,12 +85,6 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment, tasks []*kv.M } e.mppReqs = append(e.mppReqs, req) } - for _, r := range pf.ExchangeReceivers { - err = e.appendMPPDispatchReq(r.GetExchangeSender().Fragment, r.Tasks, false) - if err != nil { - return errors.Trace(err) - } - } return nil } @@ -108,13 +102,15 @@ func (e *MPPGather) Open(ctx context.Context) (err error) { // TODO: Move the construct tasks logic to planner, so we can see the explain results. sender := e.originalPlan.(*plannercore.PhysicalExchangeSender) planIDs := collectPlanIDS(e.originalPlan, nil) - rootTasks, err := plannercore.GenerateRootMPPTasks(e.ctx, e.startTS, sender, e.is) + frags, err := plannercore.GenerateRootMPPTasks(e.ctx, e.startTS, sender, e.is) if err != nil { return errors.Trace(err) } - err = e.appendMPPDispatchReq(sender.Fragment, rootTasks, true) - if err != nil { - return errors.Trace(err) + for _, frag := range frags { + err = e.appendMPPDispatchReq(frag) + if err != nil { + return errors.Trace(err) + } } failpoint.Inject("checkTotalMPPTasks", func(val failpoint.Value) { if val.(int) != len(e.mppReqs) { diff --git a/executor/tiflash_test.go b/executor/tiflash_test.go index b49630448c6f0..4718957de2b8b 100644 --- a/executor/tiflash_test.go +++ b/executor/tiflash_test.go @@ -422,6 +422,48 @@ func (s *tiflashTestSuite) TestMppGoroutinesExitFromErrors(c *C) { c.Assert(failpoint.Disable(hang), IsNil) } +func (s *tiflashTestSuite) TestMppUnionAll(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists x1") + tk.MustExec("create table x1(a int , b int);") + tk.MustExec("alter table x1 set tiflash replica 1") + tk.MustExec("drop table if exists x2") + tk.MustExec("create table x2(a int , b int);") + tk.MustExec("alter table x2 set tiflash replica 1") + tb := testGetTableByName(c, tk.Se, "test", "x1") + err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true) + c.Assert(err, IsNil) + tb = testGetTableByName(c, tk.Se, "test", "x2") + err = domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true) + c.Assert(err, IsNil) + + tk.MustExec("insert into x1 values (1, 1), (2, 2), (3, 3), (4, 4)") + tk.MustExec("insert into x2 values (5, 1), (2, 2), (3, 3), (4, 4)") + + // test join + union (join + select) + tk.MustQuery("select x1.a, x.a from x1 left join (select x2.b a, x1.b from x1 join x2 on x1.a = x2.b union all select * from x1 ) x on x1.a = x.a order by x1.a").Check(testkit.Rows("1 1", "1 1", "2 2", "2 2", "3 3", "3 3", "4 4", "4 4")) + tk.MustQuery("select x1.a, x.a from x1 left join (select count(*) a, sum(b) b from x1 group by a union all select * from x2 ) x on x1.a = x.a order by x1.a").Check(testkit.Rows("1 1", "1 1", "1 1", "1 1", "2 2", "3 3", "4 4")) + + tk.MustExec("drop table if exists x3") + tk.MustExec("create table x3(a int , b int);") + tk.MustExec("alter table x3 set tiflash replica 1") + tb = testGetTableByName(c, tk.Se, "test", "x3") + err = domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true) + c.Assert(err, IsNil) + + tk.MustExec("insert into x3 values (2, 2), (2, 3), (2, 4)") + // test nested union all + tk.MustQuery("select count(*) from (select a, b from x1 union all select a, b from x3 union all (select x1.a, x3.b from (select * from x3 union all select * from x2) x3 left join x1 on x3.a = x1.b))").Check(testkit.Rows("14")) + // test union all join union all + tk.MustQuery("select count(*) from (select * from x1 union all select * from x2 union all select * from x3) x join (select * from x1 union all select * from x2 union all select * from x3) y on x.a = y.b").Check(testkit.Rows("29")) + tk.MustExec("set @@session.tidb_broadcast_join_threshold_count=100000") + failpoint.Enable("github.com/pingcap/tidb/executor/checkTotalMPPTasks", `return(6)`) + tk.MustQuery("select count(*) from (select * from x1 union all select * from x2 union all select * from x3) x join (select * from x1 union all select * from x2 union all select * from x3) y on x.a = y.b").Check(testkit.Rows("29")) + failpoint.Disable("github.com/pingcap/tidb/executor/checkTotalMPPTasks") + +} + func (s *tiflashTestSuite) TestMppApply(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/expression/column.go b/expression/column.go index 006b9a3867cda..ebc0feaf93a06 100644 --- a/expression/column.go +++ b/expression/column.go @@ -38,10 +38,9 @@ type CorrelatedColumn struct { // Clone implements Expression interface. func (col *CorrelatedColumn) Clone() Expression { - var d types.Datum return &CorrelatedColumn{ Column: col.Column, - Data: &d, + Data: col.Data, } } diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index fea4d3524c22d..cb656b39e5edb 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -2124,7 +2124,7 @@ func (p *baseLogicalPlan) canPushToCop(storeTp kv.StoreType) bool { } } ret = ret && validDs - case *LogicalAggregation, *LogicalProjection, *LogicalSelection, *LogicalJoin: + case *LogicalAggregation, *LogicalProjection, *LogicalSelection, *LogicalJoin, *LogicalUnionAll: if storeTp == kv.TiFlash { ret = ret && c.canPushToCop(storeTp) } else { @@ -2492,16 +2492,51 @@ func (p *LogicalLock) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P func (p *LogicalUnionAll) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool) { // TODO: UnionAll can not pass any order, but we can change it to sort merge to keep order. +<<<<<<< HEAD if !prop.IsEmpty() || prop.IsFlashProp() { return nil, true +======= + if !prop.IsEmpty() || (prop.IsFlashProp() && prop.TaskTp != property.MppTaskType) { + return nil, true, nil } + // TODO: UnionAll can pass partition info, but for briefness, we prevent it from pushing down. + if prop.TaskTp == property.MppTaskType && prop.PartitionTp != property.AnyType { + return nil, true, nil +>>>>>>> 52e89cb8b... planner/core: support union all for mpp. (#24287) + } + canUseMpp := p.ctx.GetSessionVars().IsMPPAllowed() && p.canPushToCop(kv.TiFlash) chReqProps := make([]*property.PhysicalProperty, 0, len(p.children)) for range p.children { - chReqProps = append(chReqProps, &property.PhysicalProperty{ExpectedCnt: prop.ExpectedCnt}) + if canUseMpp && prop.TaskTp == property.MppTaskType { + chReqProps = append(chReqProps, &property.PhysicalProperty{ + ExpectedCnt: prop.ExpectedCnt, + TaskTp: property.MppTaskType, + }) + } else { + chReqProps = append(chReqProps, &property.PhysicalProperty{ExpectedCnt: prop.ExpectedCnt}) + } } - ua := PhysicalUnionAll{}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset, chReqProps...) + ua := PhysicalUnionAll{ + mpp: canUseMpp && prop.TaskTp == property.MppTaskType, + }.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset, chReqProps...) ua.SetSchema(p.Schema()) +<<<<<<< HEAD return []PhysicalPlan{ua}, true +======= + if canUseMpp && prop.TaskTp == property.RootTaskType { + chReqProps = make([]*property.PhysicalProperty, 0, len(p.children)) + for range p.children { + chReqProps = append(chReqProps, &property.PhysicalProperty{ + ExpectedCnt: prop.ExpectedCnt, + TaskTp: property.MppTaskType, + }) + } + mppUA := PhysicalUnionAll{mpp: true}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset, chReqProps...) + mppUA.SetSchema(p.Schema()) + return []PhysicalPlan{ua, mppUA}, true, nil + } + return []PhysicalPlan{ua}, true, nil +>>>>>>> 52e89cb8b... planner/core: support union all for mpp. (#24287) } func (p *LogicalPartitionUnionAll) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool) { diff --git a/planner/core/fragment.go b/planner/core/fragment.go index 911855ea22aa9..f329374d853f5 100644 --- a/planner/core/fragment.go +++ b/planner/core/fragment.go @@ -38,32 +38,49 @@ type Fragment struct { // following fields are filled after scheduling. ExchangeSender *PhysicalExchangeSender // data exporter + + IsRoot bool +} + +type tasksAndFrags struct { + tasks []*kv.MPPTask + frags []*Fragment } type mppTaskGenerator struct { ctx sessionctx.Context startTS uint64 is infoschema.InfoSchema + frags []*Fragment + cache map[int]tasksAndFrags } // GenerateRootMPPTasks generate all mpp tasks and return root ones. -func GenerateRootMPPTasks(ctx sessionctx.Context, startTs uint64, sender *PhysicalExchangeSender, is infoschema.InfoSchema) ([]*kv.MPPTask, error) { - g := &mppTaskGenerator{ctx: ctx, startTS: startTs, is: is} +func GenerateRootMPPTasks(ctx sessionctx.Context, startTs uint64, sender *PhysicalExchangeSender, is infoschema.InfoSchema) ([]*Fragment, error) { + g := &mppTaskGenerator{ + ctx: ctx, + startTS: startTs, + is: is, + cache: make(map[int]tasksAndFrags), + } return g.generateMPPTasks(sender) } -func (e *mppTaskGenerator) generateMPPTasks(s *PhysicalExchangeSender) ([]*kv.MPPTask, error) { +func (e *mppTaskGenerator) generateMPPTasks(s *PhysicalExchangeSender) ([]*Fragment, error) { logutil.BgLogger().Info("Mpp will generate tasks", zap.String("plan", ToString(s))) tidbTask := &kv.MPPTask{ StartTs: e.startTS, ID: -1, } - rootTasks, err := e.generateMPPTasksForFragment(s) + _, frags, err := e.generateMPPTasksForExchangeSender(s) if err != nil { return nil, errors.Trace(err) } - s.TargetTasks = []*kv.MPPTask{tidbTask} - return rootTasks, nil + for _, frag := range frags { + frag.ExchangeSender.TargetTasks = []*kv.MPPTask{tidbTask} + frag.IsRoot = true + } + return e.frags, nil } type mppAddr struct { @@ -105,6 +122,8 @@ func (f *Fragment) init(p PhysicalPlan) error { f.TableScan = x case *PhysicalExchangeReceiver: f.ExchangeReceivers = append(f.ExchangeReceivers, x) + case *PhysicalUnionAll: + return errors.New("unexpected union all detected") default: for _, ch := range p.Children() { if err := f.init(ch); err != nil { @@ -115,20 +134,107 @@ func (f *Fragment) init(p PhysicalPlan) error { return nil } -func newFragment(s *PhysicalExchangeSender) (*Fragment, error) { - f := &Fragment{ExchangeSender: s} - s.Fragment = f - err := f.init(s) - return f, errors.Trace(err) +// We would remove all the union-all operators by 'untwist'ing and copying the plans above union-all. +// This will make every route from root (ExchangeSender) to leaf nodes (ExchangeReceiver and TableScan) +// a new ioslated tree (and also a fragment) without union all. These trees (fragments then tasks) will +// finally be gathered to TiDB or be exchanged to upper tasks again. +// For instance, given a plan "select c1 from t union all select c1 from s" +// after untwist, there will be two plans in `forest` slice: +// - ExchangeSender -> Projection (c1) -> TableScan(t) +// - ExchangeSender -> Projection (c2) -> TableScan(s) +func untwistPlanAndRemoveUnionAll(stack []PhysicalPlan, forest *[]*PhysicalExchangeSender) error { + cur := stack[len(stack)-1] + switch x := cur.(type) { + case *PhysicalTableScan, *PhysicalExchangeReceiver: // This should be the leave node. + p, err := stack[0].Clone() + if err != nil { + return errors.Trace(err) + } + *forest = append(*forest, p.(*PhysicalExchangeSender)) + for i := 1; i < len(stack); i++ { + if _, ok := stack[i].(*PhysicalUnionAll); ok { + continue + } + ch, err := stack[i].Clone() + if err != nil { + return errors.Trace(err) + } + if join, ok := p.(*PhysicalHashJoin); ok { + join.SetChild(1-join.InnerChildIdx, ch) + } else { + p.SetChildren(ch) + } + p = ch + } + case *PhysicalHashJoin: + stack = append(stack, x.children[1-x.InnerChildIdx]) + err := untwistPlanAndRemoveUnionAll(stack, forest) + stack = stack[:len(stack)-1] + return errors.Trace(err) + case *PhysicalUnionAll: + for _, ch := range x.children { + stack = append(stack, ch) + err := untwistPlanAndRemoveUnionAll(stack, forest) + stack = stack[:len(stack)-1] + if err != nil { + return errors.Trace(err) + } + } + default: + if len(cur.Children()) != 1 { + return errors.Trace(errors.New("unexpected plan " + cur.ExplainID().String())) + } + ch := cur.Children()[0] + stack = append(stack, ch) + err := untwistPlanAndRemoveUnionAll(stack, forest) + stack = stack[:len(stack)-1] + return errors.Trace(err) + } + return nil } -func (e *mppTaskGenerator) generateMPPTasksForFragment(s *PhysicalExchangeSender) (tasks []*kv.MPPTask, err error) { - f, err := newFragment(s) +func buildFragments(s *PhysicalExchangeSender) ([]*Fragment, error) { + forest := make([]*PhysicalExchangeSender, 0, 1) + err := untwistPlanAndRemoveUnionAll([]PhysicalPlan{s}, &forest) if err != nil { return nil, errors.Trace(err) } + fragments := make([]*Fragment, 0, len(forest)) + for _, s := range forest { + f := &Fragment{ExchangeSender: s} + err = f.init(s) + if err != nil { + return nil, errors.Trace(err) + } + fragments = append(fragments, f) + } + return fragments, nil +} + +func (e *mppTaskGenerator) generateMPPTasksForExchangeSender(s *PhysicalExchangeSender) ([]*kv.MPPTask, []*Fragment, error) { + if cached, ok := e.cache[s.ID()]; ok { + return cached.tasks, cached.frags, nil + } + frags, err := buildFragments(s) + if err != nil { + return nil, nil, errors.Trace(err) + } + results := make([]*kv.MPPTask, 0, len(frags)) + for _, f := range frags { + tasks, err := e.generateMPPTasksForFragment(f) + if err != nil { + return nil, nil, errors.Trace(err) + } + results = append(results, tasks...) + } + e.frags = append(e.frags, frags...) + e.cache[s.ID()] = tasksAndFrags{results, frags} + return results, frags, nil +} + +func (e *mppTaskGenerator) generateMPPTasksForFragment(f *Fragment) (tasks []*kv.MPPTask, err error) { for _, r := range f.ExchangeReceivers { - r.Tasks, err = e.generateMPPTasksForFragment(r.GetExchangeSender()) + r.Tasks, r.frags, err = e.generateMPPTasksForExchangeSender(r.GetExchangeSender()) if err != nil { return nil, errors.Trace(err) } @@ -149,8 +255,9 @@ func (e *mppTaskGenerator) generateMPPTasksForFragment(s *PhysicalExchangeSender return nil, errors.New("cannot find mpp task") } for _, r := range f.ExchangeReceivers { - s := r.GetExchangeSender() - s.TargetTasks = tasks + for _, frag := range r.frags { + frag.ExchangeSender.TargetTasks = append(frag.ExchangeSender.TargetTasks, tasks...) + } } f.ExchangeSender.Tasks = tasks return tasks, nil diff --git a/planner/core/initialize.go b/planner/core/initialize.go index c63d4efa7ba31..7fff4f8b722b1 100644 --- a/planner/core/initialize.go +++ b/planner/core/initialize.go @@ -419,7 +419,7 @@ func (p PhysicalTableReader) Init(ctx sessionctx.Context, offset int) *PhysicalT if p.tablePlan != nil { p.TablePlans = flattenPushDownPlan(p.tablePlan) p.schema = p.tablePlan.Schema() - if p.StoreType == kv.TiFlash && !p.GetTableScan().KeepOrder { + if p.StoreType == kv.TiFlash && p.GetTableScan() != nil && !p.GetTableScan().KeepOrder { // When allow batch cop is 1, only agg / topN uses batch cop. // When allow batch cop is 2, every query uses batch cop. switch ctx.GetSessionVars().AllowBatchCop { diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index f47f43e79a35d..96a22dd092d46 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -1348,6 +1348,10 @@ func (b *PlanBuilder) buildProjection4Union(ctx context.Context, u *LogicalUnion b.optFlag |= flagEliminateProjection proj := LogicalProjection{Exprs: exprs, AvoidColumnEvaluator: true}.Init(b.ctx, b.getSelectOffset()) proj.SetSchema(u.schema.Clone()) + // reset the schema type to make the "not null" flag right. + for i, expr := range exprs { + proj.schema.Columns[i].RetType = expr.GetType() + } proj.SetChildren(child) u.children[childID] = proj } diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index 43304971b4680..e9c3ec51dc471 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -108,7 +108,10 @@ func (p *PhysicalTableReader) GetTableScan() *PhysicalTableScan { } else if chCnt == 1 { curPlan = curPlan.Children()[0] } else { - join := curPlan.(*PhysicalHashJoin) + join, ok := curPlan.(*PhysicalHashJoin) + if !ok { + return nil + } curPlan = join.children[1-join.globalChildIndex] } } @@ -883,6 +886,18 @@ type PhysicalExchangeReceiver struct { basePhysicalPlan Tasks []*kv.MPPTask + frags []*Fragment +} + +// Clone implment PhysicalPlan interface. +func (p *PhysicalExchangeReceiver) Clone() (PhysicalPlan, error) { + np := new(PhysicalExchangeReceiver) + base, err := p.basePhysicalPlan.cloneWithSelf(np) + if err != nil { + return nil, errors.Trace(err) + } + np.basePhysicalPlan = *base + return np, nil } // GetExchangeSender return the connected sender of this receiver. We assume that its child must be a receiver. @@ -897,10 +912,21 @@ type PhysicalExchangeSender struct { TargetTasks []*kv.MPPTask ExchangeType tipb.ExchangeType HashCols []*expression.Column - // Tasks is the mpp task for current PhysicalExchangeSender + // Tasks is the mpp task for current PhysicalExchangeSender. Tasks []*kv.MPPTask +} - Fragment *Fragment +// Clone implment PhysicalPlan interface. +func (p *PhysicalExchangeSender) Clone() (PhysicalPlan, error) { + np := new(PhysicalExchangeSender) + base, err := p.basePhysicalPlan.cloneWithSelf(np) + if err != nil { + return nil, errors.Trace(err) + } + np.basePhysicalPlan = *base + np.ExchangeType = p.ExchangeType + np.HashCols = p.HashCols + return np, nil } // Clone implements PhysicalPlan interface. @@ -951,6 +977,19 @@ func (p *PhysicalLimit) Clone() (PhysicalPlan, error) { // PhysicalUnionAll is the physical operator of UnionAll. type PhysicalUnionAll struct { physicalSchemaProducer + + mpp bool +} + +// Clone implements PhysicalPlan interface. +func (p *PhysicalUnionAll) Clone() (PhysicalPlan, error) { + cloned := new(PhysicalUnionAll) + base, err := p.physicalSchemaProducer.cloneWithSelf(cloned) + if err != nil { + return nil, err + } + cloned.physicalSchemaProducer = *base + return cloned, nil } // AggMppRunMode defines the running mode of aggregation in MPP diff --git a/planner/core/plan.go b/planner/core/plan.go index 2d38bfc375b65..f1449491c3cab 100644 --- a/planner/core/plan.go +++ b/planner/core/plan.go @@ -391,6 +391,9 @@ func (p *basePhysicalPlan) cloneWithSelf(newSelf PhysicalPlan) (*basePhysicalPla base.children = append(base.children, cloned) } for _, prop := range p.childrenReqProps { + if prop == nil { + continue + } base.childrenReqProps = append(base.childrenReqProps, prop.CloneEssentialFields()) } return base, nil diff --git a/planner/core/rule_inject_extra_projection.go b/planner/core/rule_inject_extra_projection.go index 2896a1dade0ff..968917a4fef2e 100644 --- a/planner/core/rule_inject_extra_projection.go +++ b/planner/core/rule_inject_extra_projection.go @@ -14,6 +14,7 @@ package core import ( + "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" "github.com/pingcap/tidb/kv" @@ -62,10 +63,42 @@ func (pe *projInjector) inject(plan PhysicalPlan) PhysicalPlan { plan = InjectProjBelowSort(p, p.ByItems) case *NominalSort: plan = TurnNominalSortIntoProj(p, p.OnlyColumn, p.ByItems) + case *PhysicalUnionAll: + plan = injectProjBelowUnion(p) } return plan } +func injectProjBelowUnion(un *PhysicalUnionAll) *PhysicalUnionAll { + if !un.mpp { + return un + } + for i, ch := range un.children { + exprs := make([]expression.Expression, len(ch.Schema().Columns)) + needChange := false + for i, dstCol := range un.schema.Columns { + dstType := dstCol.RetType + srcCol := ch.Schema().Columns[i] + srcType := srcCol.RetType + if !srcType.Equal(dstType) || !(mysql.HasNotNullFlag(dstType.Flag) == mysql.HasNotNullFlag(srcType.Flag)) { + exprs[i] = expression.BuildCastFunction4Union(un.ctx, srcCol, dstType) + needChange = true + } else { + exprs[i] = srcCol + } + } + if needChange { + proj := PhysicalProjection{ + Exprs: exprs, + }.Init(un.ctx, ch.statsInfo(), 0) + proj.SetSchema(un.schema.Clone()) + proj.SetChildren(ch) + un.children[i] = proj + } + } + return un +} + // wrapCastForAggFunc wraps the args of an aggregate function with a cast function. // If the mode is FinalMode or Partial2Mode, we do not need to wrap cast upon the args, // since the types of the args are already the expected. diff --git a/planner/core/task.go b/planner/core/task.go index 06cfcb1aa2878..2414bd37bc57d 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1265,7 +1265,31 @@ func (p *PhysicalProjection) attach2Task(tasks ...task) task { return t } +func (p *PhysicalUnionAll) attach2MppTasks(tasks ...task) task { + t := &mppTask{p: p} + childPlans := make([]PhysicalPlan, 0, len(tasks)) + var childMaxCost float64 + for _, tk := range tasks { + if mpp, ok := tk.(*mppTask); ok && !tk.invalid() { + childCost := mpp.cost() + if childCost > childMaxCost { + childMaxCost = childCost + } + childPlans = append(childPlans, mpp.plan()) + } else { + return invalidTask + } + } + p.SetChildren(childPlans...) + t.cst = childMaxCost + p.cost = t.cost() + return t +} + func (p *PhysicalUnionAll) attach2Task(tasks ...task) task { + if _, ok := tasks[0].(*mppTask); ok { + return p.attach2MppTasks(tasks...) + } t := &rootTask{p: p} childPlans := make([]PhysicalPlan, 0, len(tasks)) var childMaxCost float64 diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 13488ae5b3e03..b117182c2d3b3 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -240,6 +240,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *tikv.Backoffer, realResp := rpcResp.Resp.(*mpp.DispatchTaskResponse) if realResp.Error != nil { + logutil.BgLogger().Error("mpp dispatch response meet error", zap.String("error", realResp.Error.Msg)) m.sendError(errors.New(realResp.Error.Msg)) return } From db67166e6270053a94ec7f41e266704ce3376827 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Fri, 4 Jun 2021 17:12:44 +0800 Subject: [PATCH 2/7] resolve conflict --- planner/core/exhaust_physical_plans.go | 19 +++++-------------- planner/core/task.go | 1 - 2 files changed, 5 insertions(+), 15 deletions(-) diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index cb656b39e5edb..eab808527d21f 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -2492,19 +2492,14 @@ func (p *LogicalLock) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P func (p *LogicalUnionAll) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool) { // TODO: UnionAll can not pass any order, but we can change it to sort merge to keep order. -<<<<<<< HEAD - if !prop.IsEmpty() || prop.IsFlashProp() { - return nil, true -======= if !prop.IsEmpty() || (prop.IsFlashProp() && prop.TaskTp != property.MppTaskType) { - return nil, true, nil + return nil, true } // TODO: UnionAll can pass partition info, but for briefness, we prevent it from pushing down. if prop.TaskTp == property.MppTaskType && prop.PartitionTp != property.AnyType { - return nil, true, nil ->>>>>>> 52e89cb8b... planner/core: support union all for mpp. (#24287) + return nil, true } - canUseMpp := p.ctx.GetSessionVars().IsMPPAllowed() && p.canPushToCop(kv.TiFlash) + canUseMpp := p.ctx.GetSessionVars().AllowMPPExecution && p.canPushToCop(kv.TiFlash) chReqProps := make([]*property.PhysicalProperty, 0, len(p.children)) for range p.children { if canUseMpp && prop.TaskTp == property.MppTaskType { @@ -2520,9 +2515,6 @@ func (p *LogicalUnionAll) exhaustPhysicalPlans(prop *property.PhysicalProperty) mpp: canUseMpp && prop.TaskTp == property.MppTaskType, }.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset, chReqProps...) ua.SetSchema(p.Schema()) -<<<<<<< HEAD - return []PhysicalPlan{ua}, true -======= if canUseMpp && prop.TaskTp == property.RootTaskType { chReqProps = make([]*property.PhysicalProperty, 0, len(p.children)) for range p.children { @@ -2533,10 +2525,9 @@ func (p *LogicalUnionAll) exhaustPhysicalPlans(prop *property.PhysicalProperty) } mppUA := PhysicalUnionAll{mpp: true}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset, chReqProps...) mppUA.SetSchema(p.Schema()) - return []PhysicalPlan{ua, mppUA}, true, nil + return []PhysicalPlan{ua, mppUA}, true } - return []PhysicalPlan{ua}, true, nil ->>>>>>> 52e89cb8b... planner/core: support union all for mpp. (#24287) + return []PhysicalPlan{ua}, true } func (p *LogicalPartitionUnionAll) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool) { diff --git a/planner/core/task.go b/planner/core/task.go index 2414bd37bc57d..0eeeccda834cb 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1282,7 +1282,6 @@ func (p *PhysicalUnionAll) attach2MppTasks(tasks ...task) task { } p.SetChildren(childPlans...) t.cst = childMaxCost - p.cost = t.cost() return t } From 241910291f830ee93c84e4fd1c39c9345f91517f Mon Sep 17 00:00:00 2001 From: Han Fei Date: Thu, 3 Jun 2021 21:36:26 +0800 Subject: [PATCH 3/7] planner: support explain analyze for mpp task with union (#24898) (#25081) --- distsql/select_result.go | 52 ++++++++++++++++++++++++++------- util/execdetails/execdetails.go | 3 +- 2 files changed, 43 insertions(+), 12 deletions(-) diff --git a/distsql/select_result.go b/distsql/select_result.go index 9e37a02796f4f..05d7c41cf157b 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -295,13 +295,6 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr if r.rootPlanID <= 0 || r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || callee == "" { return } - if len(r.selectResp.GetExecutionSummaries()) != len(r.copPlanIDs) { - logutil.Logger(ctx).Error("invalid cop task execution summaries length", - zap.Int("expected", len(r.copPlanIDs)), - zap.Int("received", len(r.selectResp.GetExecutionSummaries()))) - - return - } if r.stats == nil { id := r.rootPlanID r.stats = &selectResultRuntimeStats{ @@ -316,12 +309,49 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RecordScanDetail(r.copPlanIDs[len(r.copPlanIDs)-1], r.storeType.Name(), copStats.ScanDetail) } - for i, detail := range r.selectResp.GetExecutionSummaries() { + // If hasExecutor is true, it means the summary is returned from TiFlash. + hasExecutor := false + for _, detail := range r.selectResp.GetExecutionSummaries() { if detail != nil && detail.TimeProcessedNs != nil && detail.NumProducedRows != nil && detail.NumIterations != nil { - planID := r.copPlanIDs[i] - r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl. - RecordOneCopTask(planID, r.storeType.Name(), callee, detail) + if detail.ExecutorId != nil { + hasExecutor = true + } + break + } + } + if hasExecutor { + var recorededPlanIDs = make(map[int]int) + for i, detail := range r.selectResp.GetExecutionSummaries() { + if detail != nil && detail.TimeProcessedNs != nil && + detail.NumProducedRows != nil && detail.NumIterations != nil { + planID := r.copPlanIDs[i] + recorededPlanIDs[r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl. + RecordOneCopTask(planID, r.storeType.Name(), callee, detail)] = 0 + } + } + num := uint64(0) + dummySummary := &tipb.ExecutorExecutionSummary{TimeProcessedNs: &num, NumProducedRows: &num, NumIterations: &num, ExecutorId: nil} + for _, planID := range r.copPlanIDs { + if _, ok := recorededPlanIDs[planID]; !ok { + r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RecordOneCopTask(planID, r.storeType.Name(), callee, dummySummary) + } + } + } else { + // For cop task cases, we still need this protection. + if len(r.selectResp.GetExecutionSummaries()) != len(r.copPlanIDs) { + logutil.Logger(ctx).Error("invalid cop task execution summaries length", + zap.Int("expected", len(r.copPlanIDs)), + zap.Int("received", len(r.selectResp.GetExecutionSummaries()))) + return + } + for i, detail := range r.selectResp.GetExecutionSummaries() { + if detail != nil && detail.TimeProcessedNs != nil && + detail.NumProducedRows != nil && detail.NumIterations != nil { + planID := r.copPlanIDs[i] + r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl. + RecordOneCopTask(planID, r.storeType.Name(), callee, detail) + } } } } diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index 322b639f3c6e5..9a7d424e7956a 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -868,7 +868,7 @@ func getPlanIDFromExecutionSummary(summary *tipb.ExecutorExecutionSummary) (int, } // RecordOneCopTask records a specific cop tasks's execution detail. -func (e *RuntimeStatsColl) RecordOneCopTask(planID int, storeType string, address string, summary *tipb.ExecutorExecutionSummary) { +func (e *RuntimeStatsColl) RecordOneCopTask(planID int, storeType string, address string, summary *tipb.ExecutorExecutionSummary) int { // for TiFlash cop response, ExecutorExecutionSummary contains executor id, so if there is a valid executor id in // summary, use it overwrite the planID if id, valid := getPlanIDFromExecutionSummary(summary); valid { @@ -876,6 +876,7 @@ func (e *RuntimeStatsColl) RecordOneCopTask(planID int, storeType string, addres } copStats := e.GetOrCreateCopStats(planID, storeType) copStats.RecordOneCopTask(address, summary) + return planID } // RecordScanDetail records a specific cop tasks's cop detail. From cd8fe220fceacb16c6fab9c00efb5b5459e34324 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Fri, 11 Jun 2021 12:24:35 +0800 Subject: [PATCH 4/7] planner/core: fix bug that injected proj get wrong index. (#25336) --- executor/tiflash_test.go | 11 +++++++++++ planner/core/rule_inject_extra_projection.go | 1 + 2 files changed, 12 insertions(+) diff --git a/executor/tiflash_test.go b/executor/tiflash_test.go index 6ec5530cd3819..923eb74167155 100644 --- a/executor/tiflash_test.go +++ b/executor/tiflash_test.go @@ -466,6 +466,17 @@ func (s *tiflashTestSuite) TestMppUnionAll(c *C) { tk.MustQuery("select count(*) from (select * from x1 union all select * from x2 union all select * from x3) x join (select * from x1 union all select * from x2 union all select * from x3) y on x.a = y.b").Check(testkit.Rows("29")) failpoint.Disable("github.com/pingcap/tidb/executor/checkTotalMPPTasks") + tk.MustExec("drop table if exists x4") + tk.MustExec("create table x4(a int not null, b int not null);") + tk.MustExec("alter table x4 set tiflash replica 1") + tb = testGetTableByName(c, tk.Se, "test", "x4") + err = domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true) + c.Assert(err, IsNil) + + tk.MustExec("set @@tidb_enforce_mpp=1") + tk.MustExec("insert into x4 values (2, 2), (2, 3)") + tk.MustQuery("(select * from x1 union all select * from x4) order by a, b").Check(testkit.Rows("1 1", "2 2", "2 2", "2 3", "3 3", "4 4")) + } func (s *tiflashTestSuite) TestMppApply(c *C) { diff --git a/planner/core/rule_inject_extra_projection.go b/planner/core/rule_inject_extra_projection.go index 968917a4fef2e..911c531ceb4f0 100644 --- a/planner/core/rule_inject_extra_projection.go +++ b/planner/core/rule_inject_extra_projection.go @@ -79,6 +79,7 @@ func injectProjBelowUnion(un *PhysicalUnionAll) *PhysicalUnionAll { for i, dstCol := range un.schema.Columns { dstType := dstCol.RetType srcCol := ch.Schema().Columns[i] + srcCol.Index = i srcType := srcCol.RetType if !srcType.Equal(dstType) || !(mysql.HasNotNullFlag(dstType.Flag) == mysql.HasNotNullFlag(srcType.Flag)) { exprs[i] = expression.BuildCastFunction4Union(un.ctx, srcCol, dstType) From 9723a2fdbe3029d533412d396a6b0cdb821295f7 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Fri, 18 Jun 2021 16:33:08 +0800 Subject: [PATCH 5/7] fix test --- executor/tiflash_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/executor/tiflash_test.go b/executor/tiflash_test.go index 923eb74167155..a95ab90051e2e 100644 --- a/executor/tiflash_test.go +++ b/executor/tiflash_test.go @@ -473,7 +473,6 @@ func (s *tiflashTestSuite) TestMppUnionAll(c *C) { err = domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true) c.Assert(err, IsNil) - tk.MustExec("set @@tidb_enforce_mpp=1") tk.MustExec("insert into x4 values (2, 2), (2, 3)") tk.MustQuery("(select * from x1 union all select * from x4) order by a, b").Check(testkit.Rows("1 1", "2 2", "2 2", "2 3", "3 3", "4 4")) From 9e07a81f397374bb8aea812955dc4c2da2c29e77 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Fri, 18 Jun 2021 19:10:22 +0800 Subject: [PATCH 6/7] fix test --- executor/tiflash_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/executor/tiflash_test.go b/executor/tiflash_test.go index 6c09fb25b87fd..92141e13a116b 100644 --- a/executor/tiflash_test.go +++ b/executor/tiflash_test.go @@ -462,6 +462,8 @@ func (s *tiflashTestSuite) TestMppUnionAll(c *C) { tb = testGetTableByName(c, tk.Se, "test", "x2") err = domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true) c.Assert(err, IsNil) + + tk.MustExec("set @@session.tidb_opt_mpp_outer_join_fixed_build_side=1") tk.MustExec("insert into x1 values (1, 1), (2, 2), (3, 3), (4, 4)") tk.MustExec("insert into x2 values (5, 1), (2, 2), (3, 3), (4, 4)") From 06dc542b54520d8d2c9e28fbf884748000e7de28 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Fri, 18 Jun 2021 21:50:55 +0800 Subject: [PATCH 7/7] fix fmt --- executor/tiflash_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/tiflash_test.go b/executor/tiflash_test.go index 92141e13a116b..2727bdc15fb56 100644 --- a/executor/tiflash_test.go +++ b/executor/tiflash_test.go @@ -462,7 +462,7 @@ func (s *tiflashTestSuite) TestMppUnionAll(c *C) { tb = testGetTableByName(c, tk.Se, "test", "x2") err = domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true) c.Assert(err, IsNil) - + tk.MustExec("set @@session.tidb_opt_mpp_outer_join_fixed_build_side=1") tk.MustExec("insert into x1 values (1, 1), (2, 2), (3, 3), (4, 4)")