From 4b4e5ab74dc9d0157a3127a0c08bb809b5b42050 Mon Sep 17 00:00:00 2001 From: hanfei Date: Mon, 24 Apr 2017 15:48:54 +0800 Subject: [PATCH 1/2] plan: new plan supports join. --- plan/dag_plan_test.go | 57 +++++++++++++++++++++ plan/new_physical_plan_builder.go | 85 +++++++++++++++++++++++++++++++ plan/task_profile.go | 49 ++++++++++++++---- 3 files changed, 181 insertions(+), 10 deletions(-) diff --git a/plan/dag_plan_test.go b/plan/dag_plan_test.go index ec55320cb3317..b0dc21d6eac65 100644 --- a/plan/dag_plan_test.go +++ b/plan/dag_plan_test.go @@ -156,3 +156,60 @@ func (s *testPlanSuite) TestDAGPlanBuilderSimpleCase(c *C) { c.Assert(ToString(p), Equals, tt.best, Commentf("for %s", tt.sql)) } } + +func (s *testPlanSuite) TestDAGPlanBuilderJoin(c *C) { + UseDAGPlanBuilder = true + defer func() { + UseDAGPlanBuilder = false + testleak.AfterTest(c)() + }() + tests := []struct { + sql string + best string + }{ + { + sql: "select * from t t1 join t t2 on t1.a = t2.a join t t3 on t1.a = t3.a", + best: "LeftHashJoin{LeftHashJoin{TableReader(Table(t))->TableReader(Table(t))}(t1.a,t2.a)->TableReader(Table(t))}(t1.a,t3.a)", + }, + { + sql: "select * from t t1 join t t2 on t1.a = t2.a order by t1.a", + best: "LeftHashJoin{TableReader(Table(t))->TableReader(Table(t))}(t1.a,t2.a)->Sort", + }, + { + sql: "select * from t t1 left outer join t t2 on t1.a = t2.a right outer join t t3 on t1.a = t3.a", + best: "RightHashJoin{LeftHashJoin{TableReader(Table(t))->TableReader(Table(t))}(t1.a,t2.a)->TableReader(Table(t))}(t1.a,t3.a)", + }, + { + sql: "select * from t t1 join t t2 on t1.a = t2.a join t t3 on t1.a = t3.a and t1.b = 1 and t3.c = 1", + best: "LeftHashJoin{RightHashJoin{TableReader(Table(t)->Sel([eq(t1.b, 1)]))->TableReader(Table(t))}(t1.a,t2.a)->IndexLookUp(Index(t.c_d_e)[[1,1]], Table(t))}(t1.a,t3.a)", + }, + { + sql: "select * from t where t.c in (select b from t s where s.a = t.a)", + best: "SemiJoin{TableReader(Table(t))->TableReader(Table(t))}", + }, + { + sql: "select t.c in (select b from t s where s.a = t.a) from t", + best: "SemiJoinWithAux{TableReader(Table(t))->TableReader(Table(t))}->Projection", + }, + } + for _, tt := range tests { + comment := Commentf("for %s", tt.sql) + stmt, err := s.ParseOneStmt(tt.sql, "", "") + c.Assert(err, IsNil, comment) + + is, err := mockResolve(stmt) + c.Assert(err, IsNil) + + builder := &planBuilder{ + allocator: new(idAllocator), + ctx: mockContext(), + colMapper: make(map[*ast.ColumnNameExpr]int), + is: is, + } + p := builder.build(stmt) + c.Assert(builder.err, IsNil) + p, err = doOptimize(builder.optFlag, p.(LogicalPlan), builder.ctx, builder.allocator) + c.Assert(err, IsNil) + c.Assert(ToString(p), Equals, tt.best, Commentf("for %s", tt.sql)) + } +} diff --git a/plan/new_physical_plan_builder.go b/plan/new_physical_plan_builder.go index 34b38820d9afa..9b305681d2c55 100644 --- a/plan/new_physical_plan_builder.go +++ b/plan/new_physical_plan_builder.go @@ -98,6 +98,91 @@ func (p *Projection) convert2NewPhysicalPlan(prop *requiredProp) (taskProfile, e return task, p.storeTaskProfile(prop, task) } +// convert2NewPhysicalPlan implements PhysicalPlan interface. +// Join has three physical operators: Hash Join, Merge Join and Index Look Up Join. We implement Hash Join at first. +func (p *LogicalJoin) convert2NewPhysicalPlan(prop *requiredProp) (taskProfile, error) { + task, err := p.getTaskProfile(prop) + if err != nil { + return nil, errors.Trace(err) + } + if task != nil { + return task, nil + } + switch p.JoinType { + case SemiJoin, LeftOuterSemiJoin: + task, err = p.convert2SemiJoin() + default: + // TODO: We will consider smj and index look up join in the future. + task, err = p.convert2HashJoin() + } + if err != nil { + return nil, errors.Trace(err) + } + // Because hash join is executed by multiple goroutines, it will not propagate physical property any more. + // TODO: We will consider the problem of property again for parallel execution. + task = prop.enforceProperty(task, p.ctx, p.allocator) + return task, p.storeTaskProfile(prop, task) +} + +func (p *LogicalJoin) convert2SemiJoin() (taskProfile, error) { + lChild := p.children[0].(LogicalPlan) + rChild := p.children[1].(LogicalPlan) + semiJoin := PhysicalHashSemiJoin{ + WithAux: LeftOuterSemiJoin == p.JoinType, + EqualConditions: p.EqualConditions, + LeftConditions: p.LeftConditions, + RightConditions: p.RightConditions, + OtherConditions: p.OtherConditions, + Anti: p.anti, + }.init(p.allocator, p.ctx) + semiJoin.SetSchema(p.schema) + lTask, err := lChild.convert2NewPhysicalPlan(&requiredProp{}) + if err != nil { + return nil, errors.Trace(err) + } + rTask, err := rChild.convert2NewPhysicalPlan(&requiredProp{}) + if err != nil { + return nil, errors.Trace(err) + } + return semiJoin.attach2TaskProfile(lTask, rTask), nil +} + +func (p *LogicalJoin) convert2HashJoin() (taskProfile, error) { + lChild := p.children[0].(LogicalPlan) + rChild := p.children[1].(LogicalPlan) + hashJoin := PhysicalHashJoin{ + EqualConditions: p.EqualConditions, + LeftConditions: p.LeftConditions, + RightConditions: p.RightConditions, + OtherConditions: p.OtherConditions, + JoinType: p.JoinType, + Concurrency: JoinConcurrency, + DefaultValues: p.DefaultValues, + }.init(p.allocator, p.ctx) + hashJoin.SetSchema(p.schema) + lTask, err := lChild.convert2NewPhysicalPlan(&requiredProp{}) + if err != nil { + return nil, errors.Trace(err) + } + rTask, err := rChild.convert2NewPhysicalPlan(&requiredProp{}) + if err != nil { + return nil, errors.Trace(err) + } + switch p.JoinType { + case LeftOuterJoin: + hashJoin.SmallTable = 1 + case RightOuterJoin: + hashJoin.SmallTable = 0 + case InnerJoin: + // We will use right table as small table. + if lTask.count() >= rTask.count() { + hashJoin.SmallTable = 1 + } + } + return hashJoin.attach2TaskProfile(lTask, rTask), nil + +} + // getPushedProp will check if this sort property can be pushed or not. In order to simplify the problem, we only // consider the case that all expression are columns and all of them are asc or desc. func (p *Sort) getPushedProp() (*requiredProp, bool) { diff --git a/plan/task_profile.go b/plan/task_profile.go index 85e20c980eccb..a76b1b90eba43 100644 --- a/plan/task_profile.go +++ b/plan/task_profile.go @@ -27,6 +27,7 @@ type taskProfile interface { cost() float64 copy() taskProfile plan() PhysicalPlan + finishTask(ctx context.Context, allocator *idAllocator) taskProfile } // TODO: In future, we should split copTask to indexTask and tableTask. @@ -100,6 +101,37 @@ func (p *basePhysicalPlan) attach2TaskProfile(tasks ...taskProfile) taskProfile return attachPlan2TaskProfile(p.basePlan.self.(PhysicalPlan).Copy(), tasks[0]) } +func (p *PhysicalHashJoin) attach2TaskProfile(tasks ...taskProfile) taskProfile { + lTask := tasks[0].copy().finishTask(p.ctx, p.allocator) + rTask := tasks[1].copy().finishTask(p.ctx, p.allocator) + np := p.Copy() + np.SetChildren(lTask.plan(), rTask.plan()) + return &rootTaskProfile{ + p: np, + // TODO: we will estimate the cost and count more precisely. + cst: lTask.cost() + rTask.cost(), + cnt: lTask.count() + rTask.count(), + } +} + +func (p *PhysicalHashSemiJoin) attach2TaskProfile(tasks ...taskProfile) taskProfile { + lTask := tasks[0].copy().finishTask(p.ctx, p.allocator) + rTask := tasks[1].copy().finishTask(p.ctx, p.allocator) + np := p.Copy() + np.SetChildren(lTask.plan(), rTask.plan()) + task := &rootTaskProfile{ + p: np, + // TODO: we will estimate the cost and count more precisely. + cst: lTask.cost() + rTask.cost(), + } + if p.WithAux { + task.cnt = lTask.count() + } else { + task.cnt = lTask.count() * selectionFactor + } + return task +} + // finishTask means we close the coprocessor task and create a root task. func (t *copTaskProfile) finishTask(ctx context.Context, allocator *idAllocator) taskProfile { // FIXME: When it is a double reading. The cost should be more expensive. The right cost should add the @@ -132,6 +164,10 @@ type rootTaskProfile struct { cnt float64 } +func (t *rootTaskProfile) finishTask(_ context.Context, _ *idAllocator) taskProfile { + return t +} + func (t *rootTaskProfile) copy() taskProfile { return &rootTaskProfile{ p: t.p, @@ -212,9 +248,7 @@ func (p *Sort) attach2TaskProfile(profiles ...taskProfile) taskProfile { profile := profiles[0].copy() // If this is a Sort , we cannot push it down. if p.ExecLimit == nil { - if cop, ok := profile.(*copTaskProfile); ok { - profile = cop.finishTask(p.ctx, p.allocator) - } + profile = profile.finishTask(p.ctx, p.allocator) profile = attachPlan2TaskProfile(p.Copy(), profile) profile.addCost(p.getCost(profile.count())) return profile @@ -241,10 +275,8 @@ func (p *Sort) attach2TaskProfile(profiles ...taskProfile) taskProfile { } copTask.addCost(pushedDownTopN.getCost(profile.count())) copTask.setCount(float64(pushedDownTopN.ExecLimit.Count)) - profile = copTask.finishTask(p.ctx, p.allocator) - } else if ok { - profile = copTask.finishTask(p.ctx, p.allocator) } + profile = profile.finishTask(p.ctx, p.allocator) profile = attachPlan2TaskProfile(p.Copy(), profile) profile.addCost(p.getCost(profile.count())) profile.setCount(float64(p.ExecLimit.Count)) @@ -267,10 +299,7 @@ func (p *Projection) attach2TaskProfile(profiles ...taskProfile) taskProfile { } func (sel *Selection) attach2TaskProfile(profiles ...taskProfile) taskProfile { - profile := profiles[0].copy() - if cop, ok := profile.(*copTaskProfile); ok { - profile = cop.finishTask(sel.ctx, sel.allocator) - } + profile := profiles[0].copy().finishTask(sel.ctx, sel.allocator) profile.addCost(profile.count() * cpuFactor) profile.setCount(profile.count() * selectionFactor) profile = attachPlan2TaskProfile(sel.Copy(), profile) From c45e3a7e7be3cb17ba35d67364e2f31481c090fe Mon Sep 17 00:00:00 2001 From: hanfei Date: Tue, 25 Apr 2017 17:07:19 +0800 Subject: [PATCH 2/2] make finishCopTask a plain function. --- plan/new_physical_plan_builder.go | 2 +- plan/task_profile.go | 36 +++++++++++++++---------------- 2 files changed, 18 insertions(+), 20 deletions(-) diff --git a/plan/new_physical_plan_builder.go b/plan/new_physical_plan_builder.go index c412c097055a7..01e21e277e9b5 100644 --- a/plan/new_physical_plan_builder.go +++ b/plan/new_physical_plan_builder.go @@ -231,7 +231,7 @@ func (p *Sort) convert2NewPhysicalPlan(prop *requiredProp) (taskProfile, error) limit.SetSchema(p.schema) orderedTask = limit.attach2TaskProfile(orderedTask) } else if cop, ok := orderedTask.(*copTaskProfile); ok { - orderedTask = cop.finishTask(p.ctx, p.allocator) + orderedTask = finishCopTask(cop, p.ctx, p.allocator) } if orderedTask.cost() < task.cost() { task = orderedTask diff --git a/plan/task_profile.go b/plan/task_profile.go index 327d44ee1b1f0..51b2227ec9211 100644 --- a/plan/task_profile.go +++ b/plan/task_profile.go @@ -27,7 +27,6 @@ type taskProfile interface { cost() float64 copy() taskProfile plan() PhysicalPlan - finishTask(ctx context.Context, allocator *idAllocator) taskProfile } // TODO: In future, we should split copTask to indexTask and tableTask. @@ -98,14 +97,13 @@ func (t *copTaskProfile) finishIndexPlan() { } func (p *basePhysicalPlan) attach2TaskProfile(tasks ...taskProfile) taskProfile { - profile := tasks[0].copy() - profile = profile.finishTask(p.basePlan.ctx, p.basePlan.allocator) + profile := finishCopTask(tasks[0].copy(), p.basePlan.ctx, p.basePlan.allocator) return attachPlan2TaskProfile(p.basePlan.self.(PhysicalPlan).Copy(), profile) } func (p *PhysicalHashJoin) attach2TaskProfile(tasks ...taskProfile) taskProfile { - lTask := tasks[0].copy().finishTask(p.ctx, p.allocator) - rTask := tasks[1].copy().finishTask(p.ctx, p.allocator) + lTask := finishCopTask(tasks[0].copy(), p.ctx, p.allocator) + rTask := finishCopTask(tasks[1].copy(), p.ctx, p.allocator) np := p.Copy() np.SetChildren(lTask.plan(), rTask.plan()) return &rootTaskProfile{ @@ -117,8 +115,8 @@ func (p *PhysicalHashJoin) attach2TaskProfile(tasks ...taskProfile) taskProfile } func (p *PhysicalHashSemiJoin) attach2TaskProfile(tasks ...taskProfile) taskProfile { - lTask := tasks[0].copy().finishTask(p.ctx, p.allocator) - rTask := tasks[1].copy().finishTask(p.ctx, p.allocator) + lTask := finishCopTask(tasks[0].copy(), p.ctx, p.allocator) + rTask := finishCopTask(tasks[1].copy(), p.ctx, p.allocator) np := p.Copy() np.SetChildren(lTask.plan(), rTask.plan()) task := &rootTaskProfile{ @@ -134,8 +132,12 @@ func (p *PhysicalHashSemiJoin) attach2TaskProfile(tasks ...taskProfile) taskProf return task } -// finishTask means we close the coprocessor task and create a root task. -func (t *copTaskProfile) finishTask(ctx context.Context, allocator *idAllocator) taskProfile { +// finishCopTask means we close the coprocessor task and create a root task. +func finishCopTask(task taskProfile, ctx context.Context, allocator *idAllocator) taskProfile { + t, ok := task.(*copTaskProfile) + if !ok { + return task + } // FIXME: When it is a double reading. The cost should be more expensive. The right cost should add the // `NetWorkStartCost` * (totalCount / perCountIndexRead) t.finishIndexPlan() @@ -166,10 +168,6 @@ type rootTaskProfile struct { cnt float64 } -func (t *rootTaskProfile) finishTask(_ context.Context, _ *idAllocator) taskProfile { - return t -} - func (t *rootTaskProfile) copy() taskProfile { return &rootTaskProfile{ p: t.p, @@ -211,7 +209,7 @@ func (p *Limit) attach2TaskProfile(profiles ...taskProfile) taskProfile { } cop = attachPlan2TaskProfile(pushedDownLimit, cop).(*copTaskProfile) cop.setCount(float64(pushedDownLimit.Count)) - profile = cop.finishTask(p.ctx, p.allocator) + profile = finishCopTask(cop, p.ctx, p.allocator) } profile = attachPlan2TaskProfile(p.Copy(), profile) profile.setCount(float64(p.Count)) @@ -250,7 +248,7 @@ func (p *Sort) attach2TaskProfile(profiles ...taskProfile) taskProfile { profile := profiles[0].copy() // If this is a Sort , we cannot push it down. if p.ExecLimit == nil { - profile = profile.finishTask(p.ctx, p.allocator) + profile = finishCopTask(profile, p.ctx, p.allocator) profile = attachPlan2TaskProfile(p.Copy(), profile) profile.addCost(p.getCost(profile.count())) return profile @@ -278,7 +276,7 @@ func (p *Sort) attach2TaskProfile(profiles ...taskProfile) taskProfile { copTask.addCost(pushedDownTopN.getCost(profile.count())) copTask.setCount(float64(pushedDownTopN.ExecLimit.Count)) } - profile = profile.finishTask(p.ctx, p.allocator) + profile = finishCopTask(profile, p.ctx, p.allocator) profile = attachPlan2TaskProfile(p.Copy(), profile) profile.addCost(p.getCost(profile.count())) profile.setCount(float64(p.ExecLimit.Count)) @@ -291,7 +289,7 @@ func (p *Projection) attach2TaskProfile(profiles ...taskProfile) taskProfile { switch t := profile.(type) { case *copTaskProfile: // TODO: Support projection push down. - task := t.finishTask(p.ctx, p.allocator) + task := finishCopTask(profile, p.ctx, p.allocator) profile = attachPlan2TaskProfile(np, task) return profile case *rootTaskProfile: @@ -305,7 +303,7 @@ func (p *Union) attach2TaskProfile(profiles ...taskProfile) taskProfile { newTask := &rootTaskProfile{p: np} newChildren := make([]Plan, 0, len(p.children)) for _, profile := range profiles { - profile = profile.finishTask(p.ctx, p.allocator) + profile = finishCopTask(profile, p.ctx, p.allocator) newTask.cst += profile.cost() newTask.cnt += profile.count() newChildren = append(newChildren, profile.plan()) @@ -315,7 +313,7 @@ func (p *Union) attach2TaskProfile(profiles ...taskProfile) taskProfile { } func (sel *Selection) attach2TaskProfile(profiles ...taskProfile) taskProfile { - profile := profiles[0].copy().finishTask(sel.ctx, sel.allocator) + profile := finishCopTask(profiles[0].copy(), sel.ctx, sel.allocator) profile.addCost(profile.count() * cpuFactor) profile.setCount(profile.count() * selectionFactor) profile = attachPlan2TaskProfile(sel.Copy(), profile)