Skip to content

Commit

Permalink
planner: reserve the OFFSET when outer join's inner side is unique (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
winoros authored Nov 13, 2024
1 parent 66cb425 commit 97e7b5c
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 19 deletions.
38 changes: 25 additions & 13 deletions pkg/expression/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,22 +128,34 @@ func (s *Schema) RetrieveColumn(col *Column) *Column {
return nil
}

// IsUniqueKey checks if this column is a unique key.
func (s *Schema) IsUniqueKey(col *Column) bool {
for _, key := range s.Keys {
if len(key) == 1 && key[0].EqualColumn(col) {
return true
}
// IsUnique checks if the column is unique key.
// Pass strong=true to check strong contraint: unique && notnull.
// Pass strong=false to check weak contraint: unique && nullable.
func (s *Schema) IsUnique(strong bool, cols ...*Column) bool {
slicesToBeIterated := s.UniqueKeys
if strong {
slicesToBeIterated = s.Keys
}
return false
}
for _, key := range slicesToBeIterated {
if len(key) > len(cols) {
continue
}
allFound := true
nextKeyCol:

// IsUnique checks if this column is a unique key which may contain duplicate nulls .
func (s *Schema) IsUnique(col *Column) bool {
for _, key := range s.UniqueKeys {
if len(key) == 1 && key[0].EqualColumn(col) {
return true
for _, keyCols := range key {
for _, col := range cols {
if keyCols.EqualColumn(col) {
continue nextKeyCol
}
}
allFound = false
break
}
if !allFound {
continue
}
return true
}
return false
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/expression/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ func TestSchemaIsUniqueKey(t *testing.T) {
}
for i, col := range schema.Columns {
if i < len(schema.Columns)-1 {
require.Equal(t, true, schema.IsUniqueKey(col))
require.Equal(t, true, schema.IsUnique(true, col))
} else {
require.Equal(t, false, schema.IsUniqueKey(col))
require.Equal(t, false, schema.IsUnique(true, col))
}
}
require.Equal(t, false, schema.IsUniqueKey(colOutSchema))
require.Equal(t, false, schema.IsUnique(true, colOutSchema))
}

func TestSchemaContains(t *testing.T) {
Expand Down
36 changes: 35 additions & 1 deletion pkg/planner/core/operator/logicalop/logical_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -1091,9 +1091,43 @@ func (p *LogicalJoin) pushDownTopNToChild(topN *LogicalTopN, idx int, opt *optim
}
}
}
count, offset := topN.Count+topN.Offset, uint64(0)
if p.JoinType == LeftOuterJoin {
innerChild := p.Children()[1]
innerJoinKey := make([]*expression.Column, 0, len(p.EqualConditions))
isNullEQ := false
for _, eqCond := range p.EqualConditions {
innerJoinKey = append(innerJoinKey, eqCond.GetArgs()[1].(*expression.Column))
if eqCond.FuncName.L == ast.NullEQ {
isNullEQ = true
}
}
// If it's unique key(unique with not null), we can push the offset down safely whatever the join key is normal eq or nulleq.
// If the join key is nulleq, then we can only push the offset down when the inner side is unique key.
// Only when the join key is normal eq, we can push the offset down when the inner side is unique(could be null).
if innerChild.Schema().IsUnique(true, innerJoinKey...) ||
(!isNullEQ && innerChild.Schema().IsUnique(false, innerJoinKey...)) {
count, offset = topN.Count, topN.Offset
}
} else if p.JoinType == RightOuterJoin {
innerChild := p.Children()[0]
innerJoinKey := make([]*expression.Column, 0, len(p.EqualConditions))
isNullEQ := false
for _, eqCond := range p.EqualConditions {
innerJoinKey = append(innerJoinKey, eqCond.GetArgs()[0].(*expression.Column))
if eqCond.FuncName.L == ast.NullEQ {
isNullEQ = true
}
}
if innerChild.Schema().IsUnique(true, innerJoinKey...) ||
(!isNullEQ && innerChild.Schema().IsUnique(false, innerJoinKey...)) {
count, offset = topN.Count, topN.Offset
}
}

newTopN := LogicalTopN{
Count: topN.Count + topN.Offset,
Count: count,
Offset: offset,
ByItems: make([]*util.ByItems, len(topN.ByItems)),
PreferLimitToCop: topN.PreferLimitToCop,
}.Init(topN.SCtx(), topN.QueryBlockOffset())
Expand Down
16 changes: 15 additions & 1 deletion pkg/planner/core/testdata/plan_suite_unexported_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,22 @@
"select a, b from (select @i as a, @i := @i+1 as b from t) t order by a desc limit 1",
// Test TopN + Left Join + Proj.
"select * from t left outer join t s on t.a = s.a order by t.a limit 5",
// Test TopN + Left Join + Proj.
// Test TopN(with offset) + Left Join + Proj, outer side join key is unique key and not null.
"select * from t left outer join t s on t.a = s.a order by t.a limit 5, 5",
// Test TopN(with offset) + Left Join + Proj, outer side join key is unique key and not null.
"select * from t left outer join t s on t.f = s.f and t.g = s.g order by t.f, t.g limit 5, 5",
// Test TopN(with offset) + Left Join + Proj, outer side join key is unique key and not null.
"select * from t left outer join t s on t.f = s.f and t.g = s.g order by t.g, t.f limit 5, 5",
// Test TopN(with offset) + Left Join + Proj, outer side join key is part of unique key and not null.
"select * from t left outer join t s on t.g = s.g order by t.g limit 5, 5",
// Test TopN(with offset) + Left Join + Proj, outer side join key is part of unique key and not null.
"select * from t left outer join t s on t.g = s.g order by t.f, t.g limit 5, 5",
// Test TopN(with offset) + Left Join + Proj, outer side join key is unique key(c, d, e) but nullable.
"select * from t left outer join t s on t.c = s.c and t.d=s.d and t.e=s.e order by t.c, t.d, t.e limit 5, 5",
// Test TopN(with offset) + Left Join + Proj, outer side join key is part of unique key(c, d, e) but nullable.
"select * from t left outer join t s on t.c = s.c and t.d=s.d order by t.c, t.d limit 5, 5",
// Test TopN(with offset) + Left Join + Proj, outer side join key is not unique.
"select * from t left outer join t s on t.b = s.b order by t.b limit 5, 5",
// Test Limit + Left Join + Proj.
"select * from t left outer join t s on t.a = s.a limit 5",
// Test Limit + Left Join Apply + Proj.
Expand Down
9 changes: 8 additions & 1 deletion pkg/planner/core/testdata/plan_suite_unexported_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,14 @@
"Join{DataScan(t)->DataScan(s)}->Limit->Projection",
"DataScan(t)->Projection->TopN([Column#13 true],0,1)->Projection",
"Join{DataScan(t)->TopN([test.t.a],0,5)->DataScan(s)}(test.t.a,test.t.a)->TopN([test.t.a],0,5)->Projection",
"Join{DataScan(t)->TopN([test.t.a],0,10)->DataScan(s)}(test.t.a,test.t.a)->TopN([test.t.a],5,5)->Projection",
"Join{DataScan(t)->TopN([test.t.a],5,5)->DataScan(s)}(test.t.a,test.t.a)->TopN([test.t.a],5,5)->Projection",
"Join{DataScan(t)->TopN([test.t.f test.t.g],5,5)->DataScan(s)}(test.t.f,test.t.f)(test.t.g,test.t.g)->TopN([test.t.f test.t.g],5,5)->Projection",
"Join{DataScan(t)->TopN([test.t.g test.t.f],5,5)->DataScan(s)}(test.t.f,test.t.f)(test.t.g,test.t.g)->TopN([test.t.g test.t.f],5,5)->Projection",
"Join{DataScan(t)->TopN([test.t.g],0,10)->DataScan(s)}(test.t.g,test.t.g)->TopN([test.t.g],5,5)->Projection",
"Join{DataScan(t)->TopN([test.t.f test.t.g],0,10)->DataScan(s)}(test.t.g,test.t.g)->TopN([test.t.f test.t.g],5,5)->Projection",
"Join{DataScan(t)->TopN([test.t.c test.t.d test.t.e],5,5)->DataScan(s)}(test.t.c,test.t.c)(test.t.d,test.t.d)(test.t.e,test.t.e)->TopN([test.t.c test.t.d test.t.e],5,5)->Projection",
"Join{DataScan(t)->TopN([test.t.c test.t.d],0,10)->DataScan(s)}(test.t.c,test.t.c)(test.t.d,test.t.d)->TopN([test.t.c test.t.d],5,5)->Projection",
"Join{DataScan(t)->TopN([test.t.b],0,10)->DataScan(s)}(test.t.b,test.t.b)->TopN([test.t.b],5,5)->Projection",
"Join{DataScan(t)->Limit->DataScan(s)}(test.t.a,test.t.a)->Limit->Projection",
"Join{DataScan(t)->Limit->DataScan(s)}(test.t.a,test.t.a)->Limit->Projection",
"Join{DataScan(t)->TopN([test.t.a],0,5)->DataScan(s)}(test.t.a,test.t.a)->TopN([test.t.a],0,5)->Projection",
Expand Down

0 comments on commit 97e7b5c

Please sign in to comment.