From ec6ff8056edc5430d133dc6e3ab6b827baf966fb Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Fri, 1 Nov 2024 15:20:24 +0800 Subject: [PATCH] optimize --- pkg/executor/join/BUILD.bazel | 2 - pkg/executor/join/inner_join_probe_test.go | 7 +- .../join/left_outer_semi_join_probe.go | 242 ++++-------------- 3 files changed, 52 insertions(+), 199 deletions(-) diff --git a/pkg/executor/join/BUILD.bazel b/pkg/executor/join/BUILD.bazel index ce37a9153fee5..a9a9ef5372587 100644 --- a/pkg/executor/join/BUILD.bazel +++ b/pkg/executor/join/BUILD.bazel @@ -115,13 +115,11 @@ go_test( "//pkg/util/codec", "//pkg/util/disk", "//pkg/util/hack", - "//pkg/util/logutil", "//pkg/util/memory", "//pkg/util/mock", "//pkg/util/sqlkiller", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", - "@org_uber_go_zap//:zap", ], ) diff --git a/pkg/executor/join/inner_join_probe_test.go b/pkg/executor/join/inner_join_probe_test.go index 2bd531cbeece8..3362dbfa5b1d2 100644 --- a/pkg/executor/join/inner_join_probe_test.go +++ b/pkg/executor/join/inner_join_probe_test.go @@ -29,11 +29,9 @@ import ( "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/codec" - "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/mock" "github.com/pingcap/tidb/pkg/util/sqlkiller" "github.com/stretchr/testify/require" - "go.uber.org/zap" ) func toNullableTypes(tps []*types.FieldType) []*types.FieldType { @@ -194,9 +192,6 @@ func checkChunksEqual(t *testing.T, expectedChunks []*chunk.Chunk, resultChunks // used for debug x = cmp(expectedRows[i], resultRows[i]) } - if x != 0 { - logutil.BgLogger().Info("expected", zap.Any("expected", expectedRows[i].ToString(schema)), zap.Any("result", resultRows[i].ToString(schema))) - } require.Equal(t, 0, x, "result index = "+strconv.Itoa(i)) } } @@ -307,7 +302,7 @@ func testJoinProbe(t *testing.T, withSel bool, leftKeyIndex []int, rightKeyIndex } } if joinType == logicalop.LeftOuterSemiJoin { - resultTypes = append(resultTypes, types.NewFieldType(mysql.TypeLonglong)) + resultTypes = append(resultTypes, types.NewFieldType(mysql.TypeTiny)) } meta := newTableMeta(buildKeyIndex, buildTypes, buildKeyTypes, probeKeyTypes, buildUsedByOtherCondition, buildUsed, needUsedFlag) diff --git a/pkg/executor/join/left_outer_semi_join_probe.go b/pkg/executor/join/left_outer_semi_join_probe.go index 186c0f54af89c..6bdf84de8083c 100644 --- a/pkg/executor/join/left_outer_semi_join_probe.go +++ b/pkg/executor/join/left_outer_semi_join_probe.go @@ -22,26 +22,23 @@ import ( type leftOuterSemiJoinProbe struct { baseJoinProbe - // used when use inner side to build, isNotMatchedRows is indexed by logical row index - isNotMatchedRows []bool - // used when use inner side to build, hasNullRows is indexed by logical row index - hasNullRows []bool - // used when use inner side to build, isNullRows is indexed by physical row index - isNullRows []bool // build/probe side used columns and offset in result chunk probeColUsed []int + // used when use inner side to build, isMatchedRows is indexed by logical row index + isMatchedRows []bool + // used when use inner side to build, isNullRows is indexed by logical row index + isNullRows []bool + + // buffer isNull for other condition evaluation + isNulls []bool + // One probe row may match multi build rows and generate multi result rows. // We see these result rows that generated by the same probe row as one group. - groupMark []int nextProcessProbeRowIdx int // used in other condition to record which rows are being processed now processedProbeRowIdxSet map[int]struct{} - - // used in other condition to record if `j.selected` of this row has been set to true - // so that we can skip this row. - skipRowIdxSet map[int]struct{} } var _ ProbeV2 = &leftOuterSemiJoinProbe{} @@ -50,7 +47,6 @@ func newLeftOuterSemiJoinProbe(base baseJoinProbe) *leftOuterSemiJoinProbe { probe := &leftOuterSemiJoinProbe{ baseJoinProbe: base, processedProbeRowIdxSet: make(map[int]struct{}), - skipRowIdxSet: make(map[int]struct{}), } probe.probeColUsed = base.lUsed return probe @@ -61,13 +57,13 @@ func (j *leftOuterSemiJoinProbe) SetChunkForProbe(chunk *chunk.Chunk) (err error if err != nil { return err } - j.isNotMatchedRows = j.isNotMatchedRows[:0] + j.isMatchedRows = j.isMatchedRows[:0] for i := 0; i < j.chunkRows; i++ { - j.isNotMatchedRows = append(j.isNotMatchedRows, true) + j.isMatchedRows = append(j.isMatchedRows, false) } - j.hasNullRows = j.hasNullRows[:0] + j.isNullRows = j.isNullRows[:0] for i := 0; i < j.chunkRows; i++ { - j.hasNullRows = append(j.hasNullRows, false) + j.isNullRows = append(j.isNullRows, false) } return nil } @@ -89,23 +85,14 @@ func (*leftOuterSemiJoinProbe) ScanRowTable(joinResult *hashjoinWorkerResult, _ } func (j *leftOuterSemiJoinProbe) Probe(joinResult *hashjoinWorkerResult, sqlKiller *sqlkiller.SQLKiller) (ok bool, _ *hashjoinWorkerResult) { - if joinResult.chk.IsFull() { - return true, joinResult - } joinedChk, remainCap, err := j.prepareForProbe(joinResult.chk) if err != nil { joinResult.err = err return false, joinResult } - isInCompleteChunk := joinedChk.IsInCompleteChunk() - // in case that virtual rows is not maintained correctly - joinedChk.SetNumVirtualRows(joinedChk.NumRows()) - // always set in complete chunk during probe - joinedChk.SetInCompleteChunk(true) - defer joinedChk.SetInCompleteChunk(isInCompleteChunk) if j.ctx.hasOtherCondition() { - err = j.probeForInnerSideBuildWithOtherCondition(joinResult.chk, joinedChk, remainCap, sqlKiller) + err = j.probeForInnerSideBuildWithOtherCondition(joinResult.chk, joinedChk, sqlKiller) } else { err = j.probeForInnerSideBuildWithoutOtherCondition(joinResult.chk, joinedChk, remainCap, sqlKiller) } @@ -116,7 +103,7 @@ func (j *leftOuterSemiJoinProbe) Probe(joinResult *hashjoinWorkerResult, sqlKill return true, joinResult } -func (j *leftOuterSemiJoinProbe) probeForInnerSideBuildWithOtherCondition(chk, joinedChk *chunk.Chunk, _ int, sqlKiller *sqlkiller.SQLKiller) (err error) { +func (j *leftOuterSemiJoinProbe) probeForInnerSideBuildWithOtherCondition(chk, joinedChk *chunk.Chunk, sqlKiller *sqlkiller.SQLKiller) (err error) { j.nextProcessProbeRowIdx = j.currentProbeRow err = j.concatenateProbeAndBuildRows(joinedChk, sqlKiller) if err != nil { @@ -126,59 +113,25 @@ func (j *leftOuterSemiJoinProbe) probeForInnerSideBuildWithOtherCondition(chk, j // To avoid `Previous chunk is not probed yet` error j.currentProbeRow = j.nextProcessProbeRowIdx if joinedChk.NumRows() > 0 { - j.selected = j.selected[:0] - j.isNullRows = j.isNullRows[:0] - j.selected, j.isNullRows, err = expression.VecEvalBool(j.ctx.SessCtx.GetExprCtx().GetEvalCtx(), false, j.ctx.OtherCondition, joinedChk, j.selected, j.isNullRows) + j.selected, j.isNulls, err = expression.VecEvalBool(j.ctx.SessCtx.GetExprCtx().GetEvalCtx(), false, j.ctx.OtherCondition, joinedChk, j.selected, j.isNulls) if err != nil { return err } - j.truncateSelect() - j.removeMatchedProbeRow() - j.buildResultForMatchedRowsAfterOtherCondition(chk, joinedChk) - } - - if j.currentProbeRow == j.chunkRows { - j.buildResultForNotMatchedRows(chk, 0) - } - return -} - -func (j *leftOuterSemiJoinProbe) buildResultForMatchedRowsAfterOtherCondition(chk, joinedChk *chunk.Chunk) { - rowCount := chk.NumRows() - for index, colIndex := range j.probeColUsed { - dstCol := chk.Column(index) - if joinedChk.Column(colIndex).Rows() > 0 { - // probe column that is already in joinedChk - srcCol := joinedChk.Column(colIndex) - chunk.CopySelectedRows(dstCol, srcCol, j.selected) - } else { - srcCol := j.currentChunk.Column(colIndex) - chunk.CopySelectedRowsWithRowIDFunc(dstCol, srcCol, j.selected, 0, len(j.selected), func(i int) int { - ret := j.rowIndexInfos[i].probeRowIndex - return j.usedRows[ret] - }) - } - } - - rowsAdded := 0 - for i, result := range j.selected { - if result { - j.isNotMatchedRows[j.rowIndexInfos[i].probeRowIndex] = false - rowsAdded++ - } - } - for i, isNull := range j.isNullRows { - if isNull { - j.hasNullRows[j.rowIndexInfos[i].probeRowIndex] = true + for i := 0; i < joinedChk.NumRows(); i++ { + if j.selected[i] { + j.isMatchedRows[j.rowIndexInfos[i].probeRowIndex] = true + } + if j.isNulls[i] { + j.isNullRows[j.rowIndexInfos[i].probeRowIndex] = true + } } } - chk.SetNumVirtualRows(rowCount + rowsAdded) - dstCol := chk.Column(len(j.probeColUsed)) - for i := 0; i < rowsAdded; i++ { - dstCol.AppendInt64(1) + if j.currentProbeRow == j.chunkRows && len(j.processedProbeRowIdxSet) == 0 { + j.buildResult(chk, 0) } + return } func (j *leftOuterSemiJoinProbe) probeForInnerSideBuildWithoutOtherCondition(_, joinedChk *chunk.Chunk, remainCap int, sqlKiller *sqlkiller.SQLKiller) (err error) { @@ -189,23 +142,16 @@ func (j *leftOuterSemiJoinProbe) probeForInnerSideBuildWithoutOtherCondition(_, for remainCap > 0 && j.currentProbeRow < j.chunkRows { if j.matchedRowsHeaders[j.currentProbeRow] != 0 { candidateRow := tagHelper.toUnsafePointer(j.matchedRowsHeaders[j.currentProbeRow]) - if isKeyMatched(meta.keyMode, j.serializedKeys[j.currentProbeRow], candidateRow, meta) { - j.appendBuildRowToCachedBuildRowsV1(j.currentProbeRow, candidateRow, joinedChk, 0, false) - j.matchedRowsForCurrentProbeRow++ - j.finishLookupCurrentProbeRow() - j.isNotMatchedRows[j.currentProbeRow] = false - j.matchedRowsHeaders[j.currentProbeRow] = 0 - j.currentProbeRow++ - remainCap-- + if !isKeyMatched(meta.keyMode, j.serializedKeys[j.currentProbeRow], candidateRow, meta) { + j.probeCollision++ + j.matchedRowsHeaders[j.currentProbeRow] = getNextRowAddress(candidateRow, tagHelper, j.matchedRowsHashValue[j.currentProbeRow]) continue } - j.probeCollision++ - j.matchedRowsHeaders[j.currentProbeRow] = getNextRowAddress(candidateRow, tagHelper, j.matchedRowsHashValue[j.currentProbeRow]) - } else { - j.finishLookupCurrentProbeRow() - remainCap-- - j.currentProbeRow++ + j.isMatchedRows[j.currentProbeRow] = true } + j.matchedRowsHeaders[j.currentProbeRow] = 0 + remainCap-- + j.currentProbeRow++ } err = checkSQLKiller(sqlKiller, "killedDuringProbe") @@ -214,54 +160,30 @@ func (j *leftOuterSemiJoinProbe) probeForInnerSideBuildWithoutOtherCondition(_, return err } - j.finishCurrentLookupLoop(joinedChk, startProbeRow) - // if no the condition, chk == joinedChk, and the matched rows are already in joinedChk - j.buildResultForNotMatchedRows(joinedChk, startProbeRow) + j.buildResult(joinedChk, startProbeRow) return nil } -func (j *leftOuterSemiJoinProbe) buildResultForNotMatchedRows(chk *chunk.Chunk, startProbeRow int) { - // append not matched rows - // for not matched rows, probe col is appended using original cols, and build column is appended using nulls - prevRows := chk.NumRows() - afterRows := prevRows +func (j *leftOuterSemiJoinProbe) buildResult(chk *chunk.Chunk, startProbeRow int) { + selected := make([]bool, j.chunkRows) + for i := startProbeRow; i < j.currentProbeRow; i++ { + selected[i] = true + } for index, colIndex := range j.probeColUsed { dstCol := chk.Column(index) srcCol := j.currentChunk.Column(colIndex) - chunk.CopySelectedRowsWithRowIDFunc(dstCol, srcCol, j.isNotMatchedRows, startProbeRow, j.currentProbeRow, func(i int) int { - return j.usedRows[i] - }) - afterRows = dstCol.Rows() - } - nullRows := afterRows - prevRows - if len(j.probeColUsed) == 0 { - for i := startProbeRow; i < j.currentProbeRow; i++ { - if j.isNotMatchedRows[i] { - nullRows++ - } - } + chunk.CopySelectedRows(dstCol, srcCol, selected) } for i := startProbeRow; i < j.currentProbeRow; i++ { - if j.isNotMatchedRows[i] { - if j.hasNullRows[i] { - chk.AppendNull(len(j.probeColUsed)) - } else { - chk.AppendInt64(len(j.probeColUsed), 0) - } - } - } - chk.SetNumVirtualRows(prevRows + nullRows) -} - -func (j *leftOuterSemiJoinProbe) finishCurrentLookupLoop(joinedChk *chunk.Chunk, startProbeRow int) { - j.baseJoinProbe.finishCurrentLookupLoop(joinedChk) - if !j.ctx.hasOtherCondition() { - for i := startProbeRow; i < j.currentProbeRow; i++ { - if !j.isNotMatchedRows[i] { - joinedChk.AppendInt64(len(j.probeColUsed), 1) - } + if j.isMatchedRows[i] { + chk.AppendInt64(len(j.probeColUsed), 1) + } else if j.isNullRows[i] { + chk.AppendNull(len(j.probeColUsed)) + } else { + chk.AppendInt64(len(j.probeColUsed), 0) } } + chk.SetNumVirtualRows(chk.NumRows()) } var maxMatchedRowNum = 4 @@ -275,7 +197,6 @@ func (j *leftOuterSemiJoinProbe) matchMultiBuildRows(joinedChk *chunk.Chunk, joi j.appendBuildRowToCachedBuildRowsV1(j.currentProbeRow, candidateRow, joinedChk, 0, true) j.matchedRowsForCurrentProbeRow++ *joinedChkRemainCap-- - j.groupMark = append(j.groupMark, j.currentProbeRow) } else { j.probeCollision++ } @@ -286,11 +207,14 @@ func (j *leftOuterSemiJoinProbe) matchMultiBuildRows(joinedChk *chunk.Chunk, joi } func (j *leftOuterSemiJoinProbe) concatenateProbeAndBuildRows(joinedChk *chunk.Chunk, sqlKiller *sqlkiller.SQLKiller) error { - j.groupMark = j.groupMark[:0] joinedChkRemainCap := joinedChk.Capacity() for joinedChkRemainCap > 0 && (len(j.processedProbeRowIdxSet) > 0 || j.nextProcessProbeRowIdx < j.chunkRows) { for probeRowIdx := range j.processedProbeRowIdxSet { + if j.isMatchedRows[probeRowIdx] { + delete(j.processedProbeRowIdxSet, probeRowIdx) + continue + } j.currentProbeRow = probeRowIdx j.matchMultiBuildRows(joinedChk, &joinedChkRemainCap) @@ -319,70 +243,6 @@ func (j *leftOuterSemiJoinProbe) concatenateProbeAndBuildRows(joinedChk *chunk.C return err } - j.finishCurrentLookupLoop(joinedChk, 0) + j.finishCurrentLookupLoop(joinedChk) return nil } - -// One probe row may match multi build rows and generate multi result rows. -// In semi join, we only need one row when left table is probe side, so we need to truncate -// the `j.select` to meet the rule that one probe row outputs at most one result row. -func (j *leftOuterSemiJoinProbe) truncateSelect() { - clear(j.skipRowIdxSet) - - // scannedGroupIdx refers to which group we are scanning now - scannedGroupIdx := -1 - selectLen := len(j.selected) - - // Set it to true when we encounter true in a group - encounterTrue := false - - for i := 0; i < selectLen; i++ { - currentGroupIdx := j.groupMark[i] - if scannedGroupIdx != currentGroupIdx { - // Switch to a new group - _, ok := j.skipRowIdxSet[currentGroupIdx] - if ok { - // When row index n is set to true in `j.selected`, it will be outputted as result. - // However, in semi join, rows in left side should be outputted for only once. - // - // When one left side row has been set to true before, we should not output it again. - // groupMark: | 0 | 0 | 0 | 0 | 1 | 1 | 2 | 2 | 2 | 0 | 0 | 1 | 2 | - // selected: | T | F | T | F | T | T | F | F | F | T | F | F | T | - // ▲ ▲ - // └ output index 0 here | - // | - // index 0 was true before, we should not output it again - encounterTrue = true - j.selected[i] = false - } else { - encounterTrue = j.selected[i] - if encounterTrue { - j.skipRowIdxSet[currentGroupIdx] = struct{}{} - } - } - - scannedGroupIdx = currentGroupIdx - continue - } - - if encounterTrue { - // When we have encountered true in a group, - // we need to set the rest of selected values to false. - j.selected[i] = false - continue - } - - encounterTrue = j.selected[i] - if encounterTrue { - j.skipRowIdxSet[currentGroupIdx] = struct{}{} - } - } -} - -func (j *leftOuterSemiJoinProbe) removeMatchedProbeRow() { - for idx, selected := range j.selected { - if selected { - delete(j.processedProbeRowIdxSet, j.groupMark[idx]) - } - } -}