From 804ae149bd3ce851474d3aaf1c2cbfa0e32717e0 Mon Sep 17 00:00:00 2001 From: crazycs Date: Mon, 7 Sep 2020 14:53:20 +0800 Subject: [PATCH 1/2] cherry pick #19721 to release-4.0 Signed-off-by: ti-srebot --- executor/hash_table.go | 2 +- executor/index_lookup_hash_join.go | 48 ++++++++++--- executor/index_lookup_join.go | 112 +++++++++++++++++++++++++++-- executor/join.go | 107 +++++++++++++++++++++++---- executor/join_test.go | 24 +++++++ 5 files changed, 264 insertions(+), 29 deletions(-) diff --git a/executor/hash_table.go b/executor/hash_table.go index f2857e8b8f9b5..20bddfa5c1a02 100644 --- a/executor/hash_table.go +++ b/executor/hash_table.go @@ -81,7 +81,7 @@ type hashStatistic struct { } func (s *hashStatistic) String() string { - return fmt.Sprintf("probe collision:%v, build:%v", s.probeCollision, s.buildTableElapse) + return fmt.Sprintf("probe_collision:%v, build:%v", s.probeCollision, s.buildTableElapse) } // hashRowContainer handles the rows and the hash map of a table. diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index 93de69c6d5c51..87b232552ae3a 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -20,6 +20,8 @@ import ( "hash/fnv" "runtime/trace" "sync" + "sync/atomic" + "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -28,7 +30,6 @@ import ( "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" - "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/ranger" ) @@ -66,6 +67,8 @@ type IndexNestedLoopHashJoin struct { curTask *indexHashJoinTask // taskCh is only used when `keepOuterOrder` is true. taskCh chan *indexHashJoinTask + + stats *indexLookUpJoinRuntimeStats } type indexHashJoinOuterWorker struct { @@ -144,12 +147,25 @@ func (e *IndexNestedLoopHashJoin) Open(ctx context.Context) error { e.memTracker = memory.NewTracker(e.id, -1) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) e.innerPtrBytes = make([][]byte, 0, 8) + if e.runtimeStats != nil { + e.stats = &indexLookUpJoinRuntimeStats{ + BasicRuntimeStats: e.runtimeStats, + } + e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) + } e.startWorkers(ctx) return nil } func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) { +<<<<<<< HEAD concurrency := e.ctx.GetSessionVars().IndexLookupJoinConcurrency +======= + concurrency := e.ctx.GetSessionVars().IndexLookupJoinConcurrency() + if e.stats != nil { + e.stats.concurrency = concurrency + } +>>>>>>> 221223b... executor: add more runtime information for join executor (#19721) workerCtx, cancelFunc := context.WithCancel(ctx) e.cancelFunc = cancelFunc innerCh := make(chan *indexHashJoinTask, concurrency) @@ -297,12 +313,6 @@ func (e *IndexNestedLoopHashJoin) Close() error { } e.taskCh = nil } - if e.runtimeStats != nil { - concurrency := cap(e.joinChkResourceCh) - runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats} - runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) - e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats) - } for i := range e.joinChkResourceCh { close(e.joinChkResourceCh[i]) } @@ -404,6 +414,10 @@ func (e *IndexNestedLoopHashJoin) newInnerWorker(taskCh chan *indexHashJoinTask, for _, ran := range e.indexRanges { copiedRanges = append(copiedRanges, ran.Clone()) } + var innerStats *innerWorkerRuntimeStats + if e.stats != nil { + innerStats = &e.stats.innerWorker + } iw := &indexHashJoinInnerWorker{ innerWorker: innerWorker{ innerCtx: e.innerCtx, @@ -412,6 +426,7 @@ func (e *IndexNestedLoopHashJoin) newInnerWorker(taskCh chan *indexHashJoinTask, executorChk: chunk.NewChunkWithCapacity(e.innerCtx.rowTypes, e.maxChunkSize), indexRanges: copiedRanges, keyOff2IdxOff: e.keyOff2IdxOff, + stats: innerStats, }, taskCh: taskCh, joiner: e.joiners[workerID], @@ -510,6 +525,12 @@ func (iw *indexHashJoinInnerWorker) getNewJoinResult(ctx context.Context) (*inde } func (iw *indexHashJoinInnerWorker) buildHashTableForOuterResult(ctx context.Context, task *indexHashJoinTask, h hash.Hash64) { + if iw.stats != nil { + start := time.Now() + defer func() { + atomic.AddInt64(&iw.stats.build, int64(time.Since(start))) + }() + } buf, numChks := make([]byte, 1), task.outerResult.NumChunks() task.lookupMap = newRowHashMap(task.outerResult.Len()) for chkIdx := 0; chkIdx < numChks; chkIdx++ { @@ -547,7 +568,6 @@ func (iw *indexHashJoinInnerWorker) fetchInnerResults(ctx context.Context, task if err != nil { return err } - lookUpContents = iw.sortAndDedupLookUpContents(lookUpContents) return iw.innerWorker.fetchInnerResults(ctx, task, lookUpContents) } @@ -559,6 +579,16 @@ func (iw *indexHashJoinInnerWorker) handleHashJoinInnerWorkerPanic(r interface{} } func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) error { + var joinStartTime time.Time + if iw.stats != nil { + start := time.Now() + defer func() { + endTime := time.Now() + atomic.AddInt64(&iw.stats.totalTime, int64(endTime.Sub(start))) + atomic.AddInt64(&iw.stats.join, int64(endTime.Sub(joinStartTime))) + }() + } + iw.wg = &sync.WaitGroup{} iw.wg.Add(1) // TODO(XuHuaiyu): we may always use the smaller side to build the hashtable. @@ -568,6 +598,8 @@ func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexH return err } iw.wg.Wait() + + joinStartTime = time.Now() if !task.keepOuterOrder { return iw.doJoinUnordered(ctx, task, joinResult, h, resultCh) } diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 093de705d4814..e2a3acee0871a 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -14,12 +14,15 @@ package executor import ( + "bytes" "context" "runtime" "runtime/trace" "sort" + "strconv" "sync" "sync/atomic" + "time" "unsafe" "github.com/pingcap/errors" @@ -77,6 +80,8 @@ type IndexLookUpJoin struct { lastColHelper *plannercore.ColWithCmpFuncManager memTracker *memory.Tracker // track memory usage. + + stats *indexLookUpJoinRuntimeStats } type outerCtx struct { @@ -138,6 +143,7 @@ type innerWorker struct { indexRanges []*ranger.Range nextColCompareFilters *plannercore.ColWithCmpFuncManager keyOff2IdxOff []int + stats *innerWorkerRuntimeStats } // Open implements the Executor interface. @@ -171,12 +177,25 @@ func (e *IndexLookUpJoin) Open(ctx context.Context) error { e.memTracker = memory.NewTracker(e.id, -1) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) e.innerPtrBytes = make([][]byte, 0, 8) + if e.runtimeStats != nil { + e.stats = &indexLookUpJoinRuntimeStats{ + BasicRuntimeStats: e.runtimeStats, + } + e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) + } e.startWorkers(ctx) return nil } func (e *IndexLookUpJoin) startWorkers(ctx context.Context) { +<<<<<<< HEAD concurrency := e.ctx.GetSessionVars().IndexLookupJoinConcurrency +======= + concurrency := e.ctx.GetSessionVars().IndexLookupJoinConcurrency() + if e.stats != nil { + e.stats.concurrency = concurrency + } +>>>>>>> 221223b... executor: add more runtime information for join executor (#19721) resultCh := make(chan *lookUpJoinTask, concurrency) e.resultCh = resultCh workerCtx, cancelFunc := context.WithCancel(ctx) @@ -212,6 +231,10 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork copiedRanges = append(copiedRanges, ran.Clone()) } + var innerStats *innerWorkerRuntimeStats + if e.stats != nil { + innerStats = &e.stats.innerWorker + } iw := &innerWorker{ innerCtx: e.innerCtx, outerCtx: e.outerCtx, @@ -220,6 +243,7 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork executorChk: chunk.NewChunkWithCapacity(e.innerCtx.rowTypes, e.maxChunkSize), indexRanges: copiedRanges, keyOff2IdxOff: e.keyOff2IdxOff, + stats: innerStats, } if e.lastColHelper != nil { // nextCwf.TmpConstant needs to be reset for every individual @@ -250,6 +274,7 @@ func (e *IndexLookUpJoin) Next(ctx context.Context, req *chunk.Chunk) error { if task == nil { return nil } + startTime := time.Now() if e.innerIter == nil || e.innerIter.Current() == e.innerIter.End() { e.lookUpMatchedInners(task, task.cursor) e.innerIter = chunk.NewIterator4Slice(task.matchedInners) @@ -277,6 +302,9 @@ func (e *IndexLookUpJoin) Next(ctx context.Context, req *chunk.Chunk) error { task.hasMatch = false task.hasNull = false } + if e.stats != nil { + atomic.AddInt64(&e.stats.probe, int64(time.Since(startTime))) + } if req.IsFull() { return nil } @@ -475,11 +503,16 @@ type indexJoinLookUpContent struct { } func (iw *innerWorker) handleTask(ctx context.Context, task *lookUpJoinTask) error { + if iw.stats != nil { + start := time.Now() + defer func() { + atomic.AddInt64(&iw.stats.totalTime, int64(time.Since(start))) + }() + } lookUpContents, err := iw.constructLookupContent(task) if err != nil { return err } - lookUpContents = iw.sortAndDedupLookUpContents(lookUpContents) err = iw.fetchInnerResults(ctx, task, lookUpContents) if err != nil { return err @@ -492,6 +525,13 @@ func (iw *innerWorker) handleTask(ctx context.Context, task *lookUpJoinTask) err } func (iw *innerWorker) constructLookupContent(task *lookUpJoinTask) ([]*indexJoinLookUpContent, error) { + if iw.stats != nil { + start := time.Now() + defer func() { + atomic.AddInt64(&iw.stats.task, 1) + atomic.AddInt64(&iw.stats.construct, int64(time.Since(start))) + }() + } lookUpContents := make([]*indexJoinLookUpContent, 0, task.outerResult.Len()) keyBuf := make([]byte, 0, 64) for chkIdx := 0; chkIdx < task.outerResult.NumChunks(); chkIdx++ { @@ -531,6 +571,7 @@ func (iw *innerWorker) constructLookupContent(task *lookUpJoinTask) ([]*indexJoi for i := range task.encodedLookUpKeys { task.memTracker.Consume(task.encodedLookUpKeys[i].MemoryUsage()) } + lookUpContents = iw.sortAndDedupLookUpContents(lookUpContents) return lookUpContents, nil } @@ -612,6 +653,12 @@ func compareRow(sc *stmtctx.StatementContext, left, right []types.Datum) int { } func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTask, lookUpContent []*indexJoinLookUpContent) error { + if iw.stats != nil { + start := time.Now() + defer func() { + atomic.AddInt64(&iw.stats.fetch, int64(time.Since(start))) + }() + } innerExec, err := iw.readerBuilder.buildExecutorForIndexJoin(ctx, lookUpContent, iw.indexRanges, iw.keyOff2IdxOff, iw.nextColCompareFilters) if err != nil { return err @@ -641,6 +688,12 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa } func (iw *innerWorker) buildLookUpMap(task *lookUpJoinTask) error { + if iw.stats != nil { + start := time.Now() + defer func() { + atomic.AddInt64(&iw.stats.build, int64(time.Since(start))) + }() + } keyBuf := make([]byte, 0, 64) valBuf := make([]byte, 8) for i := 0; i < task.innerResult.NumChunks(); i++ { @@ -685,11 +738,56 @@ func (e *IndexLookUpJoin) Close() error { e.workerWg.Wait() e.memTracker = nil e.task = nil - if e.runtimeStats != nil { - concurrency := cap(e.resultCh) - runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats} - runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) - e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats) - } return e.baseExecutor.Close() } + +type indexLookUpJoinRuntimeStats struct { + *execdetails.BasicRuntimeStats + concurrency int + probe int64 + innerWorker innerWorkerRuntimeStats +} + +type innerWorkerRuntimeStats struct { + totalTime int64 + task int64 + construct int64 + fetch int64 + build int64 + join int64 +} + +func (e *indexLookUpJoinRuntimeStats) String() string { + buf := bytes.NewBuffer(make([]byte, 0, 16)) + if e.BasicRuntimeStats != nil { + buf.WriteString(e.BasicRuntimeStats.String()) + } + if e.innerWorker.totalTime > 0 { + buf.WriteString(", inner:{total:") + buf.WriteString(time.Duration(e.innerWorker.totalTime).String()) + buf.WriteString(", concurrency:") + if e.concurrency > 0 { + buf.WriteString(strconv.Itoa(e.concurrency)) + } else { + buf.WriteString("OFF") + } + buf.WriteString(", task:") + buf.WriteString(strconv.FormatInt(e.innerWorker.task, 10)) + buf.WriteString(", construct:") + buf.WriteString(time.Duration(e.innerWorker.construct).String()) + buf.WriteString(", fetch:") + buf.WriteString(time.Duration(e.innerWorker.fetch).String()) + buf.WriteString(", build:") + buf.WriteString(time.Duration(e.innerWorker.build).String()) + if e.innerWorker.join > 0 { + buf.WriteString(", join:") + buf.WriteString(time.Duration(e.innerWorker.join).String()) + } + buf.WriteString("}") + } + if e.probe > 0 { + buf.WriteString(", probe:") + buf.WriteString(time.Duration(e.probe).String()) + } + return buf.String() +} diff --git a/executor/join.go b/executor/join.go index 49c9bebc16103..9c34ad30c5988 100644 --- a/executor/join.go +++ b/executor/join.go @@ -14,11 +14,14 @@ package executor import ( + "bytes" "context" "fmt" "runtime/trace" + "strconv" "sync" "sync/atomic" + "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -85,6 +88,8 @@ type HashJoinExec struct { // joinWorkerWaitGroup is for sync multiple join workers. joinWorkerWaitGroup sync.WaitGroup finished atomic.Value + + stats *hashJoinRuntimeStats } // probeChkResource stores the result of the join probe side fetch worker, @@ -138,14 +143,8 @@ func (e *HashJoinExec) Close() error { } e.outerMatchedStatus = e.outerMatchedStatus[:0] - if e.runtimeStats != nil { - concurrency := cap(e.joiners) - runtimeStats := newJoinRuntimeStats(e.runtimeStats) - e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats) - runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) - if e.rowContainer != nil { - runtimeStats.setHashStat(e.rowContainer.stat) - } + if e.stats != nil && e.rowContainer != nil { + e.stats.hashStat = e.rowContainer.stat } err := e.baseExecutor.Close() return err @@ -174,6 +173,13 @@ func (e *HashJoinExec) Open(ctx context.Context) error { if e.buildTypes == nil { e.buildTypes = retTypes(e.buildSideExec) } + if e.runtimeStats != nil { + e.stats = &hashJoinRuntimeStats{ + BasicRuntimeStats: e.runtimeStats, + concurrent: cap(e.joiners), + } + e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) + } return nil } @@ -403,6 +409,17 @@ func (e *HashJoinExec) waitJoinWorkersAndCloseResultChan() { } func (e *HashJoinExec) runJoinWorker(workerID uint, probeKeyColIdx []int) { + probeTime := int64(0) + if e.stats != nil { + start := time.Now() + defer func() { + t := time.Since(start) + atomic.AddInt64(&e.stats.probe, probeTime) + atomic.AddInt64(&e.stats.fetchAndProbe, int64(t)) + e.stats.setMaxFetchAndProbeTime(int64(t)) + }() + } + var ( probeSideResult *chunk.Chunk selected = make([]bool, 0, chunk.InitialCapacity) @@ -432,11 +449,13 @@ func (e *HashJoinExec) runJoinWorker(workerID uint, probeKeyColIdx []int) { if !ok { break } + start := time.Now() if e.useOuterToBuild { ok, joinResult = e.join2ChunkForOuterHashJoin(workerID, probeSideResult, hCtx, joinResult) } else { ok, joinResult = e.join2Chunk(workerID, probeSideResult, hCtx, joinResult, selected) } + probeTime += int64(time.Since(start)) if !ok { break } @@ -645,6 +664,12 @@ func (e *HashJoinExec) handleFetchAndBuildHashTablePanic(r interface{}) { } func (e *HashJoinExec) fetchAndBuildHashTable(ctx context.Context) { + if e.stats != nil { + start := time.Now() + defer func() { + e.stats.fetchAndBuildHashTable = time.Since(start) + }() + } // buildSideResultCh transfers build side chunk from build side fetch to build hash table. buildSideResultCh := make(chan *chunk.Chunk, 1) doneCh := make(chan struct{}) @@ -933,16 +958,72 @@ func (e *joinRuntimeStats) setHashStat(hashStat hashStatistic) { } func (e *joinRuntimeStats) String() string { - result := e.RuntimeStatsWithConcurrencyInfo.String() + buf := bytes.NewBuffer(make([]byte, 0, 16)) + buf.WriteString(e.RuntimeStatsWithConcurrencyInfo.String()) if e.applyCache { if e.cache.useCache { - result += fmt.Sprintf(", cache:ON, cacheHitRatio:%.3f%%", e.cache.hitRatio*100) + buf.WriteString(fmt.Sprintf(", cache:ON, cacheHitRatio:%.3f%%", e.cache.hitRatio*100)) } else { - result += fmt.Sprintf(", cache:OFF") + buf.WriteString(fmt.Sprintf(", cache:OFF")) } } if e.hasHashStat { - result += ", " + e.hashStat.String() + buf.WriteString(", " + e.hashStat.String()) + } + return buf.String() +} + +type hashJoinRuntimeStats struct { + *execdetails.BasicRuntimeStats + + fetchAndBuildHashTable time.Duration + hashStat hashStatistic + fetchAndProbe int64 + probe int64 + concurrent int + maxFetchAndProbe int64 +} + +func (e *hashJoinRuntimeStats) setMaxFetchAndProbeTime(t int64) { + for { + value := atomic.LoadInt64(&e.maxFetchAndProbe) + if t <= value { + return + } + if atomic.CompareAndSwapInt64(&e.maxFetchAndProbe, value, t) { + return + } + } +} + +func (e *hashJoinRuntimeStats) String() string { + buf := bytes.NewBuffer(make([]byte, 0, 128)) + buf.WriteString(e.BasicRuntimeStats.String()) + if e.fetchAndBuildHashTable > 0 { + buf.WriteString(", build_hash_table:{total:") + buf.WriteString(e.fetchAndBuildHashTable.String()) + buf.WriteString(", fetch:") + buf.WriteString((e.fetchAndBuildHashTable - e.hashStat.buildTableElapse).String()) + buf.WriteString(", build:") + buf.WriteString(e.hashStat.buildTableElapse.String()) + buf.WriteString("}") + } + if e.probe > 0 { + buf.WriteString(", probe:{concurrency:") + buf.WriteString(strconv.Itoa(e.concurrent)) + buf.WriteString(", total:") + buf.WriteString(time.Duration(e.fetchAndProbe).String()) + buf.WriteString(", max:") + buf.WriteString(time.Duration(atomic.LoadInt64(&e.maxFetchAndProbe)).String()) + buf.WriteString(", probe:") + buf.WriteString(time.Duration(e.probe).String()) + buf.WriteString(", fetch:") + buf.WriteString(time.Duration(e.fetchAndProbe - e.probe).String()) + if e.hashStat.probeCollision > 0 { + buf.WriteString(", probe_collision:") + buf.WriteString(strconv.Itoa(e.hashStat.probeCollision)) + } + buf.WriteString("}") } - return result + return buf.String() } diff --git a/executor/join_test.go b/executor/join_test.go index 71837e651bb0b..7fb5bda1da1f8 100644 --- a/executor/join_test.go +++ b/executor/join_test.go @@ -2275,3 +2275,27 @@ func (s *testSuiteJoin3) TestIssue19500(c *C) { tk.MustQuery("select (select (select sum(c_int) from t3 where t3.c_str > t2.c_str) from t2 where t2.c_int > t1.c_int order by c_int limit 1) q from t1 order by q;"). Check(testkit.Rows("", "", "3", "3", "3")) } + +func (s *testSuiteJoinSerial) TestExplainAnalyzeJoin(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t1,t2;") + tk.MustExec("create table t1 (a int, b int, unique index (a));") + tk.MustExec("create table t2 (a int, b int, unique index (a))") + tk.MustExec("insert into t1 values (1,1),(2,2),(3,3),(4,4),(5,5)") + tk.MustExec("insert into t2 values (1,1),(2,2),(3,3),(4,4),(5,5)") + // Test for index lookup join. + rows := tk.MustQuery("explain analyze select /*+ INL_JOIN(t1, t2) */ * from t1,t2 where t1.a=t2.a;").Rows() + c.Assert(len(rows), Equals, 8) + c.Assert(rows[0][0], Matches, "IndexJoin_.*") + c.Assert(rows[0][5], Matches, "time:.*, loops:.*, inner:{total:.*, concurrency:.*, task:.*, construct:.*, fetch:.*, build:.*}, probe:.*") + // Test for index lookup hash join. + rows = tk.MustQuery("explain analyze select /*+ INL_HASH_JOIN(t1, t2) */ * from t1,t2 where t1.a=t2.a;").Rows() + c.Assert(len(rows), Equals, 8) + c.Assert(rows[0][0], Matches, "IndexHashJoin.*") + c.Assert(rows[0][5], Matches, "time:.*, loops:.*, inner:{total:.*, concurrency:.*, task:.*, construct:.*, fetch:.*, build:.*, join:.*}") + // Test for hash join. + rows = tk.MustQuery("explain analyze select /*+ HASH_JOIN(t1, t2) */ * from t1,t2 where t1.a=t2.a;").Rows() + c.Assert(len(rows), Equals, 7) + c.Assert(rows[0][0], Matches, "HashJoin.*") + c.Assert(rows[0][5], Matches, "time:.*, loops:.*, build_hash_table:{total:.*, fetch:.*, build:.*}, probe:{concurrency:5, total:.*, max:.*, probe:.*, fetch:.*}") +} From 08bed1cfabeed3ded800371d92396d82586a0608 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Fri, 18 Sep 2020 12:59:28 +0800 Subject: [PATCH 2/2] fix conflict Signed-off-by: crazycs520 --- executor/index_lookup_hash_join.go | 4 ---- executor/index_lookup_join.go | 4 ---- 2 files changed, 8 deletions(-) diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index 87b232552ae3a..f85bb60d34835 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -158,14 +158,10 @@ func (e *IndexNestedLoopHashJoin) Open(ctx context.Context) error { } func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) { -<<<<<<< HEAD concurrency := e.ctx.GetSessionVars().IndexLookupJoinConcurrency -======= - concurrency := e.ctx.GetSessionVars().IndexLookupJoinConcurrency() if e.stats != nil { e.stats.concurrency = concurrency } ->>>>>>> 221223b... executor: add more runtime information for join executor (#19721) workerCtx, cancelFunc := context.WithCancel(ctx) e.cancelFunc = cancelFunc innerCh := make(chan *indexHashJoinTask, concurrency) diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index e2a3acee0871a..e243f1097e2cc 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -188,14 +188,10 @@ func (e *IndexLookUpJoin) Open(ctx context.Context) error { } func (e *IndexLookUpJoin) startWorkers(ctx context.Context) { -<<<<<<< HEAD concurrency := e.ctx.GetSessionVars().IndexLookupJoinConcurrency -======= - concurrency := e.ctx.GetSessionVars().IndexLookupJoinConcurrency() if e.stats != nil { e.stats.concurrency = concurrency } ->>>>>>> 221223b... executor: add more runtime information for join executor (#19721) resultCh := make(chan *lookUpJoinTask, concurrency) e.resultCh = resultCh workerCtx, cancelFunc := context.WithCancel(ctx)