Skip to content

Commit

Permalink
executor: support left outer anti semi join for hash join v2 (#58479)
Browse files Browse the repository at this point in the history
ref #53127
  • Loading branch information
wshwsh12 authored Dec 27, 2024
1 parent 9b70321 commit 3c40731
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 37 deletions.
1 change: 1 addition & 0 deletions pkg/executor/join/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ go_test(
"join_stats_test.go",
"join_table_meta_test.go",
"joiner_test.go",
"left_outer_anti_semi_join_probe_test.go",
"left_outer_join_probe_test.go",
"left_outer_semi_join_probe_test.go",
"merge_join_test.go",
Expand Down
12 changes: 10 additions & 2 deletions pkg/executor/join/base_join_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,9 +793,17 @@ func NewJoinProbe(ctx *HashJoinCtxV2, workID uint, joinType logicalop.JoinType,
panic("len(base.rUsed) != 0 for left outer semi join")
}
if rightAsBuildSide {
return newLeftOuterSemiJoinProbe(base)
return newLeftOuterSemiJoinProbe(base, false)
}
fallthrough
panic("unsupported join type")
case logicalop.AntiLeftOuterSemiJoin:
if len(base.rUsed) != 0 {
panic("len(base.rUsed) != 0 for left outer anti semi join")
}
if rightAsBuildSide {
return newLeftOuterSemiJoinProbe(base, true)
}
panic("unsupported join type")
default:
panic("unsupported join type")
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/executor/join/inner_join_probe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func testJoinProbe(t *testing.T, withSel bool, leftKeyIndex []int, rightKeyIndex
resultTypes[len(resultTypes)-1].DelFlag(mysql.NotNullFlag)
}
}
if joinType == logicalop.LeftOuterSemiJoin {
if joinType == logicalop.LeftOuterSemiJoin || joinType == logicalop.AntiLeftOuterSemiJoin {
resultTypes = append(resultTypes, types.NewFieldType(mysql.TypeTiny))
}

Expand Down Expand Up @@ -473,6 +473,10 @@ func testJoinProbe(t *testing.T, withSel bool, leftKeyIndex []int, rightKeyIndex
expectedChunks := genAntiSemiJoinResult(t, hashJoinCtx.SessCtx, leftChunks, rightChunks, leftKeyIndex, rightKeyIndex, leftTypes,
rightTypes, leftKeyTypes, rightKeyTypes, leftUsed, otherCondition, resultTypes)
checkChunksEqual(t, expectedChunks, resultChunks, resultTypes)
case logicalop.AntiLeftOuterSemiJoin:
expectedChunks := genLeftOuterAntiSemiJoinResult(t, hashJoinCtx.SessCtx, leftFilter, leftChunks, rightChunks, leftKeyIndex, rightKeyIndex, leftTypes,
rightTypes, leftKeyTypes, rightKeyTypes, leftUsed, otherCondition, resultTypes)
checkChunksEqual(t, expectedChunks, resultChunks, resultTypes)
default:
require.NoError(t, errors.New("not supported join type"))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/join/inner_join_spill_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func getReturnTypes(joinType logicalop.JoinType, param spillTestParam) []*types.
resultTypes[len(resultTypes)-1].DelFlag(mysql.NotNullFlag)
}
}
if joinType == logicalop.LeftOuterSemiJoin {
if joinType == logicalop.LeftOuterSemiJoin || joinType == logicalop.AntiLeftOuterSemiJoin {
resultTypes = append(resultTypes, types.NewFieldType(mysql.TypeTiny))
}
return resultTypes
Expand Down
54 changes: 54 additions & 0 deletions pkg/executor/join/left_outer_anti_semi_join_probe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package join

import (
"testing"

"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
)

func genLeftOuterAntiSemiJoinResult(t *testing.T, sessCtx sessionctx.Context, leftFilter expression.CNFExprs, leftChunks []*chunk.Chunk, rightChunks []*chunk.Chunk, leftKeyIndex []int, rightKeyIndex []int,
leftTypes []*types.FieldType, rightTypes []*types.FieldType, leftKeyTypes []*types.FieldType, rightKeyTypes []*types.FieldType, leftUsedColumns []int, otherConditions expression.CNFExprs,
resultTypes []*types.FieldType) []*chunk.Chunk {
return genLeftOuterSemiOrSemiJoinOrLeftOuterAntiSemiResultImpl(t, sessCtx, leftFilter, leftChunks, rightChunks, leftKeyIndex, rightKeyIndex, leftTypes, rightTypes, leftKeyTypes, rightKeyTypes, leftUsedColumns, otherConditions, resultTypes, true, true)
}

func TestLeftOuterAntiSemiJoinProbeBasic(t *testing.T) {
testLeftOuterSemiOrSemiJoinProbeBasic(t, true, true)
}

func TestLeftOuterAntiSemiJoinProbeAllJoinKeys(t *testing.T) {
testLeftOuterSemiJoinProbeAllJoinKeys(t, true, true)
}

func TestLeftOuterAntiSemiJoinProbeOtherCondition(t *testing.T) {
testLeftOuterSemiJoinProbeOtherCondition(t, true, true)
}

func TestLeftOuterAntiSemiJoinProbeWithSel(t *testing.T) {
testLeftOuterSemiJoinProbeWithSel(t, true, true)
}

func TestLeftOuterAntiSemiJoinBuildResultFastPath(t *testing.T) {
testLeftOuterSemiJoinOrLeftOuterAntiSemiJoinBuildResultFastPath(t, true)
}

func TestLeftOuterAntiSemiJoinSpill(t *testing.T) {
testLeftOuterSemiJoinOrLeftOuterAntiSemiJoinSpill(t, true)
}
39 changes: 29 additions & 10 deletions pkg/executor/join/left_outer_semi_join_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@ type leftOuterSemiJoinProbe struct {

// isNullRows marks whether the left side row matched result is null
isNullRows []bool
// isAnti marks whether the join is anti semi join
isAnti bool
}

var _ ProbeV2 = &leftOuterSemiJoinProbe{}

func newLeftOuterSemiJoinProbe(base baseJoinProbe) *leftOuterSemiJoinProbe {
func newLeftOuterSemiJoinProbe(base baseJoinProbe, isAnti bool) *leftOuterSemiJoinProbe {
probe := &leftOuterSemiJoinProbe{
baseSemiJoin: *newBaseSemiJoin(base, false),
isAnti: isAnti,
}
return probe
}
Expand Down Expand Up @@ -193,18 +196,34 @@ func (j *leftOuterSemiJoinProbe) buildResult(chk *chunk.Chunk, startProbeRow int
}
}

for i := startProbeRow; i < j.currentProbeRow; i++ {
if selected != nil && !selected[i] {
continue
if j.isAnti {
for i := startProbeRow; i < j.currentProbeRow; i++ {
if selected != nil && !selected[i] {
continue
}
if j.isMatchedRows[i] {
chk.AppendInt64(len(j.lUsed), 0)
} else if j.isNullRows[i] {
chk.AppendNull(len(j.lUsed))
} else {
chk.AppendInt64(len(j.lUsed), 1)
}
}
if j.isMatchedRows[i] {
chk.AppendInt64(len(j.lUsed), 1)
} else if j.isNullRows[i] {
chk.AppendNull(len(j.lUsed))
} else {
chk.AppendInt64(len(j.lUsed), 0)
} else {
for i := startProbeRow; i < j.currentProbeRow; i++ {
if selected != nil && !selected[i] {
continue
}
if j.isMatchedRows[i] {
chk.AppendInt64(len(j.lUsed), 1)
} else if j.isNullRows[i] {
chk.AppendNull(len(j.lUsed))
} else {
chk.AppendInt64(len(j.lUsed), 0)
}
}
}

chk.SetNumVirtualRows(chk.NumRows())
}

Expand Down
82 changes: 63 additions & 19 deletions pkg/executor/join/left_outer_semi_join_probe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,19 @@ import (
func genLeftOuterSemiJoinResult(t *testing.T, sessCtx sessionctx.Context, leftFilter expression.CNFExprs, leftChunks []*chunk.Chunk, rightChunks []*chunk.Chunk, leftKeyIndex []int, rightKeyIndex []int,
leftTypes []*types.FieldType, rightTypes []*types.FieldType, leftKeyTypes []*types.FieldType, rightKeyTypes []*types.FieldType, leftUsedColumns []int, otherConditions expression.CNFExprs,
resultTypes []*types.FieldType) []*chunk.Chunk {
return genLeftOuterSemiOrSemiJoinResultImpl(t, sessCtx, leftFilter, leftChunks, rightChunks, leftKeyIndex, rightKeyIndex, leftTypes, rightTypes, leftKeyTypes, rightKeyTypes, leftUsedColumns, otherConditions, resultTypes, true)
return genLeftOuterSemiOrSemiJoinOrLeftOuterAntiSemiResultImpl(t, sessCtx, leftFilter, leftChunks, rightChunks, leftKeyIndex, rightKeyIndex, leftTypes, rightTypes, leftKeyTypes, rightKeyTypes, leftUsedColumns, otherConditions, resultTypes, true, false)
}

func genSemiJoinResult(t *testing.T, sessCtx sessionctx.Context, leftFilter expression.CNFExprs, leftChunks []*chunk.Chunk, rightChunks []*chunk.Chunk, leftKeyIndex []int, rightKeyIndex []int,
leftTypes []*types.FieldType, rightTypes []*types.FieldType, leftKeyTypes []*types.FieldType, rightKeyTypes []*types.FieldType, leftUsedColumns []int, otherConditions expression.CNFExprs,
resultTypes []*types.FieldType) []*chunk.Chunk {
return genLeftOuterSemiOrSemiJoinResultImpl(t, sessCtx, leftFilter, leftChunks, rightChunks, leftKeyIndex, rightKeyIndex, leftTypes, rightTypes, leftKeyTypes, rightKeyTypes, leftUsedColumns, otherConditions, resultTypes, false)
return genLeftOuterSemiOrSemiJoinOrLeftOuterAntiSemiResultImpl(t, sessCtx, leftFilter, leftChunks, rightChunks, leftKeyIndex, rightKeyIndex, leftTypes, rightTypes, leftKeyTypes, rightKeyTypes, leftUsedColumns, otherConditions, resultTypes, false, false)
}

// generate left outer semi join result using nested loop
func genLeftOuterSemiOrSemiJoinResultImpl(t *testing.T, sessCtx sessionctx.Context, leftFilter expression.CNFExprs, leftChunks []*chunk.Chunk, rightChunks []*chunk.Chunk, leftKeyIndex []int, rightKeyIndex []int,
func genLeftOuterSemiOrSemiJoinOrLeftOuterAntiSemiResultImpl(t *testing.T, sessCtx sessionctx.Context, leftFilter expression.CNFExprs, leftChunks []*chunk.Chunk, rightChunks []*chunk.Chunk, leftKeyIndex []int, rightKeyIndex []int,
leftTypes []*types.FieldType, rightTypes []*types.FieldType, leftKeyTypes []*types.FieldType, rightKeyTypes []*types.FieldType, leftUsedColumns []int, otherConditions expression.CNFExprs,
resultTypes []*types.FieldType, isLeftOuter bool) []*chunk.Chunk {
resultTypes []*types.FieldType, isLeftOuter bool, isAnti bool) []*chunk.Chunk {
filterVector := make([]bool, 0)
var err error
returnChks := make([]*chunk.Chunk, 0, 1)
Expand All @@ -67,9 +67,15 @@ func genLeftOuterSemiOrSemiJoinResultImpl(t *testing.T, sessCtx sessionctx.Conte
}
if leftFilter != nil && !filterVector[filterIndex] {
if isLeftOuter {
// Filtered by left filter, append 0 for matched flag
// Filtered by left filter
// Left Outer Semi Join: append 0 for matched flag
// Left Outer Anti Semi Join: append 1 for matched flag
appendToResultChk(leftChunk.GetRow(leftIndex), chunk.Row{}, leftUsedColumns, nil, resultChk)
resultChk.AppendInt64(len(leftUsedColumns), 0)
if isAnti {
resultChk.AppendInt64(len(leftUsedColumns), 1)
} else {
resultChk.AppendInt64(len(leftUsedColumns), 0)
}
}

if resultChk.IsFull() {
Expand Down Expand Up @@ -117,13 +123,25 @@ func genLeftOuterSemiOrSemiJoinResultImpl(t *testing.T, sessCtx sessionctx.Conte
if isLeftOuter {
// Append result with matched flag
appendToResultChk(leftRow, chunk.Row{}, leftUsedColumns, nil, resultChk)
if hasMatch {
resultChk.AppendInt64(len(leftUsedColumns), 1)
if isAnti {
if hasMatch {
resultChk.AppendInt64(len(leftUsedColumns), 0)
} else {
if hasNull {
resultChk.AppendNull(len(leftUsedColumns))
} else {
resultChk.AppendInt64(len(leftUsedColumns), 1)
}
}
} else {
if hasNull {
resultChk.AppendNull(len(leftUsedColumns))
if hasMatch {
resultChk.AppendInt64(len(leftUsedColumns), 1)
} else {
resultChk.AppendInt64(len(leftUsedColumns), 0)
if hasNull {
resultChk.AppendNull(len(leftUsedColumns))
} else {
resultChk.AppendInt64(len(leftUsedColumns), 0)
}
}
}
} else {
Expand All @@ -144,7 +162,7 @@ func genLeftOuterSemiOrSemiJoinResultImpl(t *testing.T, sessCtx sessionctx.Conte
return returnChks
}

func testLeftOuterSemiOrSemiJoinProbeBasic(t *testing.T, isLeftOuter bool) {
func testLeftOuterSemiOrSemiJoinProbeBasic(t *testing.T, isLeftOuter bool, isAnti bool) {
// todo test nullable type after builder support nullable type
tinyTp := types.NewFieldType(mysql.TypeTiny)
tinyTp.AddFlag(mysql.NotNullFlag)
Expand Down Expand Up @@ -177,6 +195,9 @@ func testLeftOuterSemiOrSemiJoinProbeBasic(t *testing.T, isLeftOuter bool) {
var joinType logicalop.JoinType
if isLeftOuter {
joinType = logicalop.LeftOuterSemiJoin
if isAnti {
joinType = logicalop.AntiLeftOuterSemiJoin
}
} else {
joinType = logicalop.SemiJoin
}
Expand Down Expand Up @@ -215,7 +236,7 @@ func testLeftOuterSemiOrSemiJoinProbeBasic(t *testing.T, isLeftOuter bool) {
}
}

func testLeftOuterSemiJoinProbeAllJoinKeys(t *testing.T, isLeftOuter bool) {
func testLeftOuterSemiJoinProbeAllJoinKeys(t *testing.T, isLeftOuter bool, isAnti bool) {
tinyTp := types.NewFieldType(mysql.TypeTiny)
tinyTp.AddFlag(mysql.NotNullFlag)
intTp := types.NewFieldType(mysql.TypeLonglong)
Expand Down Expand Up @@ -262,6 +283,9 @@ func testLeftOuterSemiJoinProbeAllJoinKeys(t *testing.T, isLeftOuter bool) {
var joinType logicalop.JoinType
if isLeftOuter {
joinType = logicalop.LeftOuterSemiJoin
if isAnti {
joinType = logicalop.AntiLeftOuterSemiJoin
}
} else {
joinType = logicalop.SemiJoin
}
Expand Down Expand Up @@ -312,7 +336,7 @@ func testLeftOuterSemiJoinProbeAllJoinKeys(t *testing.T, isLeftOuter bool) {
}
}

func testLeftOuterSemiJoinProbeOtherCondition(t *testing.T, isLeftOuter bool) {
func testLeftOuterSemiJoinProbeOtherCondition(t *testing.T, isLeftOuter bool, isAnti bool) {
intTp := types.NewFieldType(mysql.TypeLonglong)
intTp.AddFlag(mysql.NotNullFlag)
nullableIntTp := types.NewFieldType(mysql.TypeLonglong)
Expand Down Expand Up @@ -344,6 +368,9 @@ func testLeftOuterSemiJoinProbeOtherCondition(t *testing.T, isLeftOuter bool) {
var joinType logicalop.JoinType
if isLeftOuter {
joinType = logicalop.LeftOuterSemiJoin
if isAnti {
joinType = logicalop.AntiLeftOuterSemiJoin
}
} else {
joinType = logicalop.SemiJoin
}
Expand Down Expand Up @@ -382,7 +409,7 @@ func testLeftOuterSemiJoinProbeOtherCondition(t *testing.T, isLeftOuter bool) {
}
}

func testLeftOuterSemiJoinProbeWithSel(t *testing.T, isLeftOuter bool) {
func testLeftOuterSemiJoinProbeWithSel(t *testing.T, isLeftOuter bool, isAnti bool) {
intTp := types.NewFieldType(mysql.TypeLonglong)
intTp.AddFlag(mysql.NotNullFlag)
nullableIntTp := types.NewFieldType(mysql.TypeLonglong)
Expand All @@ -409,6 +436,9 @@ func testLeftOuterSemiJoinProbeWithSel(t *testing.T, isLeftOuter bool) {
var joinType logicalop.JoinType
if isLeftOuter {
joinType = logicalop.LeftOuterSemiJoin
if isAnti {
joinType = logicalop.AntiLeftOuterSemiJoin
}
} else {
joinType = logicalop.SemiJoin
}
Expand Down Expand Up @@ -443,22 +473,26 @@ func testLeftOuterSemiJoinProbeWithSel(t *testing.T, isLeftOuter bool) {
}

func TestLeftOuterSemiJoinProbeBasic(t *testing.T) {
testLeftOuterSemiOrSemiJoinProbeBasic(t, true)
testLeftOuterSemiOrSemiJoinProbeBasic(t, true, false)
}

func TestLeftOuterSemiJoinProbeAllJoinKeys(t *testing.T) {
testLeftOuterSemiJoinProbeAllJoinKeys(t, true)
testLeftOuterSemiJoinProbeAllJoinKeys(t, true, false)
}

func TestLeftOuterSemiJoinProbeOtherCondition(t *testing.T) {
testLeftOuterSemiJoinProbeOtherCondition(t, true)
testLeftOuterSemiJoinProbeOtherCondition(t, true, false)
}

func TestLeftOuterSemiJoinProbeWithSel(t *testing.T) {
testLeftOuterSemiJoinProbeWithSel(t, true)
testLeftOuterSemiJoinProbeWithSel(t, true, false)
}

func TestLeftOuterSemiJoinBuildResultFastPath(t *testing.T) {
testLeftOuterSemiJoinOrLeftOuterAntiSemiJoinBuildResultFastPath(t, false)
}

func testLeftOuterSemiJoinOrLeftOuterAntiSemiJoinBuildResultFastPath(t *testing.T, isAnti bool) {
intTp := types.NewFieldType(mysql.TypeLonglong)
intTp.AddFlag(mysql.NotNullFlag)
nullableIntTp := types.NewFieldType(mysql.TypeLonglong)
Expand Down Expand Up @@ -487,6 +521,9 @@ func TestLeftOuterSemiJoinBuildResultFastPath(t *testing.T) {
otherCondition2 := make(expression.CNFExprs, 0)
otherCondition2 = append(otherCondition2, sf2)
joinType := logicalop.LeftOuterSemiJoin
if isAnti {
joinType = logicalop.AntiLeftOuterSemiJoin
}
simpleFilter := createSimpleFilter(t)
hasFilter := []bool{false, true}
rightAsBuildSide := []bool{true}
Expand Down Expand Up @@ -519,6 +556,10 @@ func TestLeftOuterSemiJoinBuildResultFastPath(t *testing.T) {
}

func TestLeftOuterSemiJoinSpill(t *testing.T) {
testLeftOuterSemiJoinOrLeftOuterAntiSemiJoinSpill(t, false)
}

func testLeftOuterSemiJoinOrLeftOuterAntiSemiJoinSpill(t *testing.T, isAnti bool) {
ctx := mock.NewContext()
ctx.GetSessionVars().InitChunkSize = 32
ctx.GetSessionVars().MaxChunkSize = 32
Expand Down Expand Up @@ -554,6 +595,9 @@ func TestLeftOuterSemiJoinSpill(t *testing.T) {
spillChunkSize = 100

joinType := logicalop.LeftOuterSemiJoin
if isAnti {
joinType = logicalop.AntiLeftOuterSemiJoin
}
params := []spillTestParam{
// basic case
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 100000, 10000}},
Expand Down
Loading

0 comments on commit 3c40731

Please sign in to comment.