Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: reserve the OFFSET when outer join's inner side is unique (#56483) #57361

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 25 additions & 13 deletions pkg/expression/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,22 +128,34 @@ func (s *Schema) RetrieveColumn(col *Column) *Column {
return nil
}

// IsUniqueKey checks if this column is a unique key.
func (s *Schema) IsUniqueKey(col *Column) bool {
for _, key := range s.Keys {
if len(key) == 1 && key[0].EqualColumn(col) {
return true
}
// IsUnique checks if the column is unique key.
// Pass strong=true to check strong contraint: unique && notnull.
// Pass strong=false to check weak contraint: unique && nullable.
func (s *Schema) IsUnique(strong bool, cols ...*Column) bool {
slicesToBeIterated := s.UniqueKeys
if strong {
slicesToBeIterated = s.Keys
}
return false
}
for _, key := range slicesToBeIterated {
if len(key) > len(cols) {
continue
}
allFound := true
nextKeyCol:

// IsUnique checks if this column is a unique key which may contain duplicate nulls .
func (s *Schema) IsUnique(col *Column) bool {
for _, key := range s.UniqueKeys {
if len(key) == 1 && key[0].EqualColumn(col) {
return true
for _, keyCols := range key {
for _, col := range cols {
if keyCols.EqualColumn(col) {
continue nextKeyCol
}
}
allFound = false
break
}
if !allFound {
continue
}
return true
}
return false
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/expression/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ func TestSchemaIsUniqueKey(t *testing.T) {
}
for i, col := range schema.Columns {
if i < len(schema.Columns)-1 {
require.Equal(t, true, schema.IsUniqueKey(col))
require.Equal(t, true, schema.IsUnique(true, col))
} else {
require.Equal(t, false, schema.IsUniqueKey(col))
require.Equal(t, false, schema.IsUnique(true, col))
}
}
require.Equal(t, false, schema.IsUniqueKey(colOutSchema))
require.Equal(t, false, schema.IsUnique(true, colOutSchema))
}

func TestSchemaContains(t *testing.T) {
Expand Down
36 changes: 35 additions & 1 deletion pkg/planner/core/operator/logicalop/logical_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -1091,9 +1091,43 @@ func (p *LogicalJoin) pushDownTopNToChild(topN *LogicalTopN, idx int, opt *optim
}
}
}
count, offset := topN.Count+topN.Offset, uint64(0)
if p.JoinType == LeftOuterJoin {
innerChild := p.Children()[1]
innerJoinKey := make([]*expression.Column, 0, len(p.EqualConditions))
isNullEQ := false
for _, eqCond := range p.EqualConditions {
innerJoinKey = append(innerJoinKey, eqCond.GetArgs()[1].(*expression.Column))
if eqCond.FuncName.L == ast.NullEQ {
isNullEQ = true
}
}
// If it's unique key(unique with not null), we can push the offset down safely whatever the join key is normal eq or nulleq.
// If the join key is nulleq, then we can only push the offset down when the inner side is unique key.
// Only when the join key is normal eq, we can push the offset down when the inner side is unique(could be null).
if innerChild.Schema().IsUnique(true, innerJoinKey...) ||
(!isNullEQ && innerChild.Schema().IsUnique(false, innerJoinKey...)) {
count, offset = topN.Count, topN.Offset
}
} else if p.JoinType == RightOuterJoin {
innerChild := p.Children()[0]
innerJoinKey := make([]*expression.Column, 0, len(p.EqualConditions))
isNullEQ := false
for _, eqCond := range p.EqualConditions {
innerJoinKey = append(innerJoinKey, eqCond.GetArgs()[0].(*expression.Column))
if eqCond.FuncName.L == ast.NullEQ {
isNullEQ = true
}
}
if innerChild.Schema().IsUnique(true, innerJoinKey...) ||
(!isNullEQ && innerChild.Schema().IsUnique(false, innerJoinKey...)) {
count, offset = topN.Count, topN.Offset
}
}

newTopN := LogicalTopN{
Count: topN.Count + topN.Offset,
Count: count,
Offset: offset,
ByItems: make([]*util.ByItems, len(topN.ByItems)),
PreferLimitToCop: topN.PreferLimitToCop,
}.Init(topN.SCtx(), topN.QueryBlockOffset())
Expand Down
16 changes: 15 additions & 1 deletion pkg/planner/core/testdata/plan_suite_unexported_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,22 @@
"select a, b from (select @i as a, @i := @i+1 as b from t) t order by a desc limit 1",
// Test TopN + Left Join + Proj.
"select * from t left outer join t s on t.a = s.a order by t.a limit 5",
// Test TopN + Left Join + Proj.
// Test TopN(with offset) + Left Join + Proj, outer side join key is unique key and not null.
"select * from t left outer join t s on t.a = s.a order by t.a limit 5, 5",
// Test TopN(with offset) + Left Join + Proj, outer side join key is unique key and not null.
"select * from t left outer join t s on t.f = s.f and t.g = s.g order by t.f, t.g limit 5, 5",
// Test TopN(with offset) + Left Join + Proj, outer side join key is unique key and not null.
"select * from t left outer join t s on t.f = s.f and t.g = s.g order by t.g, t.f limit 5, 5",
// Test TopN(with offset) + Left Join + Proj, outer side join key is part of unique key and not null.
"select * from t left outer join t s on t.g = s.g order by t.g limit 5, 5",
// Test TopN(with offset) + Left Join + Proj, outer side join key is part of unique key and not null.
"select * from t left outer join t s on t.g = s.g order by t.f, t.g limit 5, 5",
// Test TopN(with offset) + Left Join + Proj, outer side join key is unique key(c, d, e) but nullable.
"select * from t left outer join t s on t.c = s.c and t.d=s.d and t.e=s.e order by t.c, t.d, t.e limit 5, 5",
// Test TopN(with offset) + Left Join + Proj, outer side join key is part of unique key(c, d, e) but nullable.
"select * from t left outer join t s on t.c = s.c and t.d=s.d order by t.c, t.d limit 5, 5",
// Test TopN(with offset) + Left Join + Proj, outer side join key is not unique.
"select * from t left outer join t s on t.b = s.b order by t.b limit 5, 5",
// Test Limit + Left Join + Proj.
"select * from t left outer join t s on t.a = s.a limit 5",
// Test Limit + Left Join Apply + Proj.
Expand Down
9 changes: 8 additions & 1 deletion pkg/planner/core/testdata/plan_suite_unexported_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,14 @@
"Join{DataScan(t)->DataScan(s)}->Limit->Projection",
"DataScan(t)->Projection->TopN([Column#13 true],0,1)->Projection",
"Join{DataScan(t)->TopN([test.t.a],0,5)->DataScan(s)}(test.t.a,test.t.a)->TopN([test.t.a],0,5)->Projection",
"Join{DataScan(t)->TopN([test.t.a],0,10)->DataScan(s)}(test.t.a,test.t.a)->TopN([test.t.a],5,5)->Projection",
"Join{DataScan(t)->TopN([test.t.a],5,5)->DataScan(s)}(test.t.a,test.t.a)->TopN([test.t.a],5,5)->Projection",
"Join{DataScan(t)->TopN([test.t.f test.t.g],5,5)->DataScan(s)}(test.t.f,test.t.f)(test.t.g,test.t.g)->TopN([test.t.f test.t.g],5,5)->Projection",
"Join{DataScan(t)->TopN([test.t.g test.t.f],5,5)->DataScan(s)}(test.t.f,test.t.f)(test.t.g,test.t.g)->TopN([test.t.g test.t.f],5,5)->Projection",
"Join{DataScan(t)->TopN([test.t.g],0,10)->DataScan(s)}(test.t.g,test.t.g)->TopN([test.t.g],5,5)->Projection",
"Join{DataScan(t)->TopN([test.t.f test.t.g],0,10)->DataScan(s)}(test.t.g,test.t.g)->TopN([test.t.f test.t.g],5,5)->Projection",
"Join{DataScan(t)->TopN([test.t.c test.t.d test.t.e],5,5)->DataScan(s)}(test.t.c,test.t.c)(test.t.d,test.t.d)(test.t.e,test.t.e)->TopN([test.t.c test.t.d test.t.e],5,5)->Projection",
"Join{DataScan(t)->TopN([test.t.c test.t.d],0,10)->DataScan(s)}(test.t.c,test.t.c)(test.t.d,test.t.d)->TopN([test.t.c test.t.d],5,5)->Projection",
"Join{DataScan(t)->TopN([test.t.b],0,10)->DataScan(s)}(test.t.b,test.t.b)->TopN([test.t.b],5,5)->Projection",
"Join{DataScan(t)->Limit->DataScan(s)}(test.t.a,test.t.a)->Limit->Projection",
"Join{DataScan(t)->Limit->DataScan(s)}(test.t.a,test.t.a)->Limit->Projection",
"Join{DataScan(t)->TopN([test.t.a],0,5)->DataScan(s)}(test.t.a,test.t.a)->TopN([test.t.a],0,5)->Projection",
Expand Down