diff --git a/executor/benchmark_test.go b/executor/benchmark_test.go index 7ff162239ebb9..99bb6ceec9103 100644 --- a/executor/benchmark_test.go +++ b/executor/benchmark_test.go @@ -916,20 +916,18 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor) } e := &HashJoinExec{ baseExecutor: newBaseExecutor(testCase.ctx, joinSchema, 5, innerExec, outerExec), - hashJoinCtx: &hashJoinCtx{ - joinType: testCase.joinType, // 0 for InnerJoin, 1 for LeftOutersJoin, 2 for RightOuterJoin - isOuterJoin: false, - useOuterToBuild: testCase.useOuterToBuild, - }, - probeSideTupleFetcher: &probeSideTupleFetcher{ + probeSideTupleFetcher: probeSideTupleFetcher{ probeSideExec: outerExec, }, probeWorkers: make([]probeWorker, testCase.concurrency), concurrency: uint(testCase.concurrency), + joinType: testCase.joinType, // 0 for InnerJoin, 1 for LeftOutersJoin, 2 for RightOuterJoin + isOuterJoin: false, buildKeys: joinKeys, probeKeys: probeKeys, buildSideExec: innerExec, buildSideEstCount: float64(testCase.rows), + useOuterToBuild: testCase.useOuterToBuild, } childrenUsedSchema := markChildrenUsedCols(e.Schema(), e.children[0].Schema(), e.children[1].Schema()) diff --git a/executor/builder.go b/executor/builder.go index fafe467a418f4..c2b09ad8eed53 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1423,14 +1423,11 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo } e := &HashJoinExec{ - baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), leftExec, rightExec), - probeSideTupleFetcher: &probeSideTupleFetcher{}, - hashJoinCtx: &hashJoinCtx{ - isOuterJoin: v.JoinType.IsOuterJoin(), - useOuterToBuild: v.UseOuterToBuild, - joinType: v.JoinType, - }, - concurrency: v.Concurrency, + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), leftExec, rightExec), + concurrency: v.Concurrency, + joinType: v.JoinType, + isOuterJoin: v.JoinType.IsOuterJoin(), + useOuterToBuild: v.UseOuterToBuild, } defaultValues := v.DefaultValues lhsTypes, rhsTypes := retTypes(leftExec), retTypes(rightExec) diff --git a/executor/join.go b/executor/join.go index 7ff401970db55..ccff7d2e86516 100644 --- a/executor/join.go +++ b/executor/join.go @@ -46,22 +46,8 @@ var ( _ Executor = &NestedLoopApplyExec{} ) -type hashJoinCtx struct { - joinResultCh chan *hashjoinWorkerResult - // closeCh add a lock for closing executor. - closeCh chan struct{} - finished atomic.Bool - useOuterToBuild bool - isOuterJoin bool - buildFinished chan error - rowContainer *hashRowContainer - joinType plannercore.JoinType -} - // probeSideTupleFetcher reads tuples from probeSideExec and send them to probeWorkers. type probeSideTupleFetcher struct { - *hashJoinCtx - probeSideExec Executor probeChkResourceCh chan *probeChkResource probeResultChs []chan *chunk.Chunk @@ -87,8 +73,7 @@ type probeWorker struct { type HashJoinExec struct { baseExecutor - probeSideTupleFetcher *probeSideTupleFetcher - *hashJoinCtx + probeSideTupleFetcher probeWorkers []probeWorker buildSideExec Executor buildSideEstCount float64 @@ -102,19 +87,29 @@ type HashJoinExec struct { buildTypes []*types.FieldType // concurrency is the number of partition, build and join workers. - concurrency uint + concurrency uint + rowContainer *hashRowContainer + buildFinished chan error - worker util.WaitGroupWrapper - waiter util.WaitGroupWrapper + // closeCh add a lock for closing executor. + closeCh chan struct{} + worker util.WaitGroupWrapper + waiter util.WaitGroupWrapper + joinType plannercore.JoinType joinChkResourceCh []chan *chunk.Chunk + joinResultCh chan *hashjoinWorkerResult memTracker *memory.Tracker // track memory usage. diskTracker *disk.Tracker // track disk usage. outerMatchedStatus []*bitmap.ConcurrentBitmap + useOuterToBuild bool - prepared bool + prepared bool + isOuterJoin bool + + finished atomic.Bool stats *hashJoinRuntimeStats } @@ -219,32 +214,32 @@ func (e *HashJoinExec) Open(ctx context.Context) error { // fetchProbeSideChunks get chunks from fetches chunks from the big table in a background goroutine // and sends the chunks to multiple channels which will be read by multiple join workers. -func (fetcher *probeSideTupleFetcher) fetchProbeSideChunks(ctx context.Context, maxChunkSize int) { +func (e *HashJoinExec) fetchProbeSideChunks(ctx context.Context) { hasWaitedForBuild := false for { - if fetcher.finished.Load() { + if e.finished.Load() { return } var probeSideResource *probeChkResource var ok bool select { - case <-fetcher.closeCh: + case <-e.closeCh: return - case probeSideResource, ok = <-fetcher.probeChkResourceCh: + case probeSideResource, ok = <-e.probeSideTupleFetcher.probeChkResourceCh: if !ok { return } } probeSideResult := probeSideResource.chk - if fetcher.isOuterJoin { - required := int(atomic.LoadInt64(&fetcher.requiredRows)) - probeSideResult.SetRequiredRows(required, maxChunkSize) + if e.isOuterJoin { + required := int(atomic.LoadInt64(&e.probeSideTupleFetcher.requiredRows)) + probeSideResult.SetRequiredRows(required, e.maxChunkSize) } - err := Next(ctx, fetcher.probeSideExec, probeSideResult) + err := Next(ctx, e.probeSideTupleFetcher.probeSideExec, probeSideResult) failpoint.Inject("ConsumeRandomPanic", nil) if err != nil { - fetcher.joinResultCh <- &hashjoinWorkerResult{ + e.joinResultCh <- &hashjoinWorkerResult{ err: err, } return @@ -255,18 +250,23 @@ func (fetcher *probeSideTupleFetcher) fetchProbeSideChunks(ctx context.Context, probeSideResult.Reset() } }) - if probeSideResult.NumRows() == 0 && !fetcher.useOuterToBuild { - fetcher.finished.Store(true) + if probeSideResult.NumRows() == 0 && !e.useOuterToBuild { + e.finished.Store(true) } - emptyBuild, buildErr := fetcher.wait4BuildSide() + emptyBuild, buildErr := e.wait4BuildSide() if buildErr != nil { - fetcher.joinResultCh <- &hashjoinWorkerResult{ + e.joinResultCh <- &hashjoinWorkerResult{ err: buildErr, } return } else if emptyBuild { return } + // after building is finished. the hash null bucket slice is allocated and determined. + // copy it for multi probe worker. + for _, w := range e.probeWorkers { + w.rowContainerForProbe.hashNANullBucket = e.rowContainer.hashNANullBucket + } hasWaitedForBuild = true } @@ -278,16 +278,16 @@ func (fetcher *probeSideTupleFetcher) fetchProbeSideChunks(ctx context.Context, } } -func (fetcher *probeSideTupleFetcher) wait4BuildSide() (emptyBuild bool, err error) { +func (e *HashJoinExec) wait4BuildSide() (emptyBuild bool, err error) { select { - case <-fetcher.closeCh: + case <-e.closeCh: return true, nil - case err := <-fetcher.buildFinished: + case err := <-e.buildFinished: if err != nil { return false, err } } - if fetcher.rowContainer.Len() == uint64(0) && (fetcher.joinType == plannercore.InnerJoin || fetcher.joinType == plannercore.SemiJoin) { + if e.rowContainer.Len() == uint64(0) && (e.joinType == plannercore.InnerJoin || e.joinType == plannercore.SemiJoin) { return true, nil } return false, nil @@ -331,11 +331,6 @@ func (e *HashJoinExec) fetchBuildSideRows(ctx context.Context, chkCh chan<- *chu } func (e *HashJoinExec) initializeForProbe() { - // e.joinResultCh is for transmitting the join result chunks to the main - // thread. - e.joinResultCh = make(chan *hashjoinWorkerResult, e.concurrency+1) - - e.probeSideTupleFetcher.hashJoinCtx = e.hashJoinCtx // e.probeSideTupleFetcher.probeResultChs is for transmitting the chunks which store the data of // probeSideExec, it'll be written by probe side worker goroutine, and read by join // workers. @@ -361,14 +356,18 @@ func (e *HashJoinExec) initializeForProbe() { e.joinChkResourceCh[i] = make(chan *chunk.Chunk, 1) e.joinChkResourceCh[i] <- newFirstChunk(e) } + + // e.joinResultCh is for transmitting the join result chunks to the main + // thread. + e.joinResultCh = make(chan *hashjoinWorkerResult, e.concurrency+1) } func (e *HashJoinExec) fetchAndProbeHashTable(ctx context.Context) { e.initializeForProbe() e.worker.RunWithRecover(func() { defer trace.StartRegion(ctx, "HashJoinProbeSideFetcher").End() - e.probeSideTupleFetcher.fetchProbeSideChunks(ctx, e.maxChunkSize) - }, e.probeSideTupleFetcher.handleProbeSideFetcherPanic) + e.fetchProbeSideChunks(ctx) + }, e.handleProbeSideFetcherPanic) probeKeyColIdx := make([]int, len(e.probeKeys)) probeNAKeColIdx := make([]int, len(e.probeNAKeys)) @@ -378,6 +377,7 @@ func (e *HashJoinExec) fetchAndProbeHashTable(ctx context.Context) { for i := range e.probeNAKeys { probeNAKeColIdx[i] = e.probeNAKeys[i].Index } + for i := uint(0); i < e.concurrency; i++ { workID := i e.worker.RunWithRecover(func() { @@ -388,12 +388,12 @@ func (e *HashJoinExec) fetchAndProbeHashTable(ctx context.Context) { e.waiter.RunWithRecover(e.waitJoinWorkersAndCloseResultChan, nil) } -func (fetcher *probeSideTupleFetcher) handleProbeSideFetcherPanic(r interface{}) { - for i := range fetcher.probeResultChs { - close(fetcher.probeResultChs[i]) +func (e *HashJoinExec) handleProbeSideFetcherPanic(r interface{}) { + for i := range e.probeSideTupleFetcher.probeResultChs { + close(e.probeSideTupleFetcher.probeResultChs[i]) } if r != nil { - fetcher.joinResultCh <- &hashjoinWorkerResult{err: errors.Errorf("%v", r)} + e.joinResultCh <- &hashjoinWorkerResult{err: errors.Errorf("%v", r)} } }