Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plan: new plan supports join. #3126

Merged
merged 5 commits into from
Apr 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions plan/dag_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,63 @@ func (s *testPlanSuite) TestDAGPlanBuilderSimpleCase(c *C) {
}
}

func (s *testPlanSuite) TestDAGPlanBuilderJoin(c *C) {
UseDAGPlanBuilder = true
defer func() {
UseDAGPlanBuilder = false
testleak.AfterTest(c)()
}()
tests := []struct {
sql string
best string
}{
{
sql: "select * from t t1 join t t2 on t1.a = t2.a join t t3 on t1.a = t3.a",
best: "LeftHashJoin{LeftHashJoin{TableReader(Table(t))->TableReader(Table(t))}(t1.a,t2.a)->TableReader(Table(t))}(t1.a,t3.a)",
},
{
sql: "select * from t t1 join t t2 on t1.a = t2.a order by t1.a",
best: "LeftHashJoin{TableReader(Table(t))->TableReader(Table(t))}(t1.a,t2.a)->Sort",
},
{
sql: "select * from t t1 left outer join t t2 on t1.a = t2.a right outer join t t3 on t1.a = t3.a",
best: "RightHashJoin{LeftHashJoin{TableReader(Table(t))->TableReader(Table(t))}(t1.a,t2.a)->TableReader(Table(t))}(t1.a,t3.a)",
},
{
sql: "select * from t t1 join t t2 on t1.a = t2.a join t t3 on t1.a = t3.a and t1.b = 1 and t3.c = 1",
best: "LeftHashJoin{RightHashJoin{TableReader(Table(t)->Sel([eq(t1.b, 1)]))->TableReader(Table(t))}(t1.a,t2.a)->IndexLookUp(Index(t.c_d_e)[[1,1]], Table(t))}(t1.a,t3.a)",
},
{
sql: "select * from t where t.c in (select b from t s where s.a = t.a)",
best: "SemiJoin{TableReader(Table(t))->TableReader(Table(t))}",
},
{
sql: "select t.c in (select b from t s where s.a = t.a) from t",
best: "SemiJoinWithAux{TableReader(Table(t))->TableReader(Table(t))}->Projection",
},
}
for _, tt := range tests {
comment := Commentf("for %s", tt.sql)
stmt, err := s.ParseOneStmt(tt.sql, "", "")
c.Assert(err, IsNil, comment)

is, err := mockResolve(stmt)
c.Assert(err, IsNil)

builder := &planBuilder{
allocator: new(idAllocator),
ctx: mockContext(),
colMapper: make(map[*ast.ColumnNameExpr]int),
is: is,
}
p := builder.build(stmt)
c.Assert(builder.err, IsNil)
p, err = doOptimize(builder.optFlag, p.(LogicalPlan), builder.ctx, builder.allocator)
c.Assert(err, IsNil)
c.Assert(ToString(p), Equals, tt.best, Commentf("for %s", tt.sql))
}
}

func (s *testPlanSuite) TestDAGPlanBuilderBasePhysicalPlan(c *C) {
UseDAGPlanBuilder = true
defer func() {
Expand Down
87 changes: 86 additions & 1 deletion plan/new_physical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,91 @@ func (p *Projection) convert2NewPhysicalPlan(prop *requiredProp) (taskProfile, e
return task, p.storeTaskProfile(prop, task)
}

// convert2NewPhysicalPlan implements PhysicalPlan interface.
// Join has three physical operators: Hash Join, Merge Join and Index Look Up Join. We implement Hash Join at first.
func (p *LogicalJoin) convert2NewPhysicalPlan(prop *requiredProp) (taskProfile, error) {
task, err := p.getTaskProfile(prop)
if err != nil {
return nil, errors.Trace(err)
}
if task != nil {
return task, nil
}
switch p.JoinType {
case SemiJoin, LeftOuterSemiJoin:
task, err = p.convert2SemiJoin()
default:
// TODO: We will consider smj and index look up join in the future.
task, err = p.convert2HashJoin()
}
if err != nil {
return nil, errors.Trace(err)
}
// Because hash join is executed by multiple goroutines, it will not propagate physical property any more.
// TODO: We will consider the problem of property again for parallel execution.
task = prop.enforceProperty(task, p.ctx, p.allocator)
return task, p.storeTaskProfile(prop, task)
}

func (p *LogicalJoin) convert2SemiJoin() (taskProfile, error) {
lChild := p.children[0].(LogicalPlan)
rChild := p.children[1].(LogicalPlan)
semiJoin := PhysicalHashSemiJoin{
WithAux: LeftOuterSemiJoin == p.JoinType,
EqualConditions: p.EqualConditions,
LeftConditions: p.LeftConditions,
RightConditions: p.RightConditions,
OtherConditions: p.OtherConditions,
Anti: p.anti,
}.init(p.allocator, p.ctx)
semiJoin.SetSchema(p.schema)
lTask, err := lChild.convert2NewPhysicalPlan(&requiredProp{})
if err != nil {
return nil, errors.Trace(err)
}
rTask, err := rChild.convert2NewPhysicalPlan(&requiredProp{})
if err != nil {
return nil, errors.Trace(err)
}
return semiJoin.attach2TaskProfile(lTask, rTask), nil
}

func (p *LogicalJoin) convert2HashJoin() (taskProfile, error) {
lChild := p.children[0].(LogicalPlan)
rChild := p.children[1].(LogicalPlan)
hashJoin := PhysicalHashJoin{
EqualConditions: p.EqualConditions,
LeftConditions: p.LeftConditions,
RightConditions: p.RightConditions,
OtherConditions: p.OtherConditions,
JoinType: p.JoinType,
Concurrency: JoinConcurrency,
DefaultValues: p.DefaultValues,
}.init(p.allocator, p.ctx)
hashJoin.SetSchema(p.schema)
lTask, err := lChild.convert2NewPhysicalPlan(&requiredProp{})
if err != nil {
return nil, errors.Trace(err)
}
rTask, err := rChild.convert2NewPhysicalPlan(&requiredProp{})
if err != nil {
return nil, errors.Trace(err)
}
switch p.JoinType {
case LeftOuterJoin:
hashJoin.SmallTable = 1
case RightOuterJoin:
hashJoin.SmallTable = 0
case InnerJoin:
// We will use right table as small table.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this comment mean?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If right plan's count is smaller than the left one, we will treat right plan as small table.

if lTask.count() >= rTask.count() {
hashJoin.SmallTable = 1
}
}
return hashJoin.attach2TaskProfile(lTask, rTask), nil

}

// getPushedProp will check if this sort property can be pushed or not. In order to simplify the problem, we only
// consider the case that all expression are columns and all of them are asc or desc.
func (p *Sort) getPushedProp() (*requiredProp, bool) {
Expand Down Expand Up @@ -146,7 +231,7 @@ func (p *Sort) convert2NewPhysicalPlan(prop *requiredProp) (taskProfile, error)
limit.SetSchema(p.schema)
orderedTask = limit.attach2TaskProfile(orderedTask)
} else if cop, ok := orderedTask.(*copTaskProfile); ok {
orderedTask = cop.finishTask(p.ctx, p.allocator)
orderedTask = finishCopTask(cop, p.ctx, p.allocator)
}
if orderedTask.cost() < task.cost() {
task = orderedTask
Expand Down
67 changes: 45 additions & 22 deletions plan/task_profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,47 @@ func (t *copTaskProfile) finishIndexPlan() {
}

func (p *basePhysicalPlan) attach2TaskProfile(tasks ...taskProfile) taskProfile {
profile := tasks[0].copy()
if cop, ok := profile.(*copTaskProfile); ok {
profile = cop.finishTask(p.basePlan.ctx, p.basePlan.allocator)
}
profile := finishCopTask(tasks[0].copy(), p.basePlan.ctx, p.basePlan.allocator)
return attachPlan2TaskProfile(p.basePlan.self.(PhysicalPlan).Copy(), profile)
}

// finishTask means we close the coprocessor task and create a root task.
func (t *copTaskProfile) finishTask(ctx context.Context, allocator *idAllocator) taskProfile {
func (p *PhysicalHashJoin) attach2TaskProfile(tasks ...taskProfile) taskProfile {
lTask := finishCopTask(tasks[0].copy(), p.ctx, p.allocator)
rTask := finishCopTask(tasks[1].copy(), p.ctx, p.allocator)
np := p.Copy()
np.SetChildren(lTask.plan(), rTask.plan())
return &rootTaskProfile{
p: np,
// TODO: we will estimate the cost and count more precisely.
cst: lTask.cost() + rTask.cost(),
cnt: lTask.count() + rTask.count(),
}
}

func (p *PhysicalHashSemiJoin) attach2TaskProfile(tasks ...taskProfile) taskProfile {
lTask := finishCopTask(tasks[0].copy(), p.ctx, p.allocator)
rTask := finishCopTask(tasks[1].copy(), p.ctx, p.allocator)
np := p.Copy()
np.SetChildren(lTask.plan(), rTask.plan())
task := &rootTaskProfile{
p: np,
// TODO: we will estimate the cost and count more precisely.
cst: lTask.cost() + rTask.cost(),
}
if p.WithAux {
task.cnt = lTask.count()
} else {
task.cnt = lTask.count() * selectionFactor
}
return task
}

// finishCopTask means we close the coprocessor task and create a root task.
func finishCopTask(task taskProfile, ctx context.Context, allocator *idAllocator) taskProfile {
t, ok := task.(*copTaskProfile)
if !ok {
return task
}
// FIXME: When it is a double reading. The cost should be more expensive. The right cost should add the
// `NetWorkStartCost` * (totalCount / perCountIndexRead)
t.finishIndexPlan()
Expand Down Expand Up @@ -183,7 +215,7 @@ func (p *Limit) attach2TaskProfile(profiles ...taskProfile) taskProfile {
}
cop = attachPlan2TaskProfile(pushedDownLimit, cop).(*copTaskProfile)
cop.setCount(float64(pushedDownLimit.Count))
profile = cop.finishTask(p.ctx, p.allocator)
profile = finishCopTask(cop, p.ctx, p.allocator)
}
profile = attachPlan2TaskProfile(p.Copy(), profile)
profile.setCount(float64(p.Count))
Expand Down Expand Up @@ -222,9 +254,7 @@ func (p *Sort) attach2TaskProfile(profiles ...taskProfile) taskProfile {
profile := profiles[0].copy()
// If this is a Sort , we cannot push it down.
if p.ExecLimit == nil {
if cop, ok := profile.(*copTaskProfile); ok {
profile = cop.finishTask(p.ctx, p.allocator)
}
profile = finishCopTask(profile, p.ctx, p.allocator)
profile = attachPlan2TaskProfile(p.Copy(), profile)
profile.addCost(p.getCost(profile.count()))
return profile
Expand All @@ -251,10 +281,8 @@ func (p *Sort) attach2TaskProfile(profiles ...taskProfile) taskProfile {
}
copTask.addCost(pushedDownTopN.getCost(profile.count()))
copTask.setCount(float64(pushedDownTopN.ExecLimit.Count))
profile = copTask.finishTask(p.ctx, p.allocator)
} else if ok {
profile = copTask.finishTask(p.ctx, p.allocator)
}
profile = finishCopTask(profile, p.ctx, p.allocator)
profile = attachPlan2TaskProfile(p.Copy(), profile)
profile.addCost(p.getCost(profile.count()))
profile.setCount(float64(p.ExecLimit.Count))
Expand All @@ -267,7 +295,7 @@ func (p *Projection) attach2TaskProfile(profiles ...taskProfile) taskProfile {
switch t := profile.(type) {
case *copTaskProfile:
// TODO: Support projection push down.
task := t.finishTask(p.ctx, p.allocator)
task := finishCopTask(profile, p.ctx, p.allocator)
profile = attachPlan2TaskProfile(np, task)
return profile
case *rootTaskProfile:
Expand All @@ -281,9 +309,7 @@ func (p *Union) attach2TaskProfile(profiles ...taskProfile) taskProfile {
newTask := &rootTaskProfile{p: np}
newChildren := make([]Plan, 0, len(p.children))
for _, profile := range profiles {
if cop, ok := profile.(*copTaskProfile); ok {
profile = cop.finishTask(p.ctx, p.allocator)
}
profile = finishCopTask(profile, p.ctx, p.allocator)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not check task type here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

task checking is moved to finishCopTask

newTask.cst += profile.cost()
newTask.cnt += profile.count()
newChildren = append(newChildren, profile.plan())
Expand All @@ -293,10 +319,7 @@ func (p *Union) attach2TaskProfile(profiles ...taskProfile) taskProfile {
}

func (sel *Selection) attach2TaskProfile(profiles ...taskProfile) taskProfile {
profile := profiles[0].copy()
if cop, ok := profile.(*copTaskProfile); ok {
profile = cop.finishTask(sel.ctx, sel.allocator)
}
profile := finishCopTask(profiles[0].copy(), sel.ctx, sel.allocator)
profile.addCost(profile.count() * cpuFactor)
profile.setCount(profile.count() * selectionFactor)
profile = attachPlan2TaskProfile(sel.Copy(), profile)
Expand Down Expand Up @@ -378,7 +401,7 @@ func (p *PhysicalAggregation) attach2TaskProfile(profiles ...taskProfile) taskPr
cop.cnt = cop.cnt * aggFactor
}
}
profile = cop.finishTask(p.ctx, p.allocator)
profile = finishCopTask(cop, p.ctx, p.allocator)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need cop.copy()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needn't, finishCopTask will not change cop task.

attachPlan2TaskProfile(finalAgg, profile)
} else {
np := p.Copy()
Expand Down