From 2610521ee358b2da2a8aaa7213015f19ff6b5447 Mon Sep 17 00:00:00 2001 From: zyguan Date: Fri, 13 Dec 2024 09:49:03 +0800 Subject: [PATCH] executor: run idxlookup workers in a pool (#58033) ref pingcap/tidb#56649 --- pkg/executor/distsql.go | 145 ++++++++++++++++++------------------- pkg/executor/utils.go | 69 ++++++++++++++++++ pkg/executor/utils_test.go | 94 ++++++++++++++++++++++++ 3 files changed, 232 insertions(+), 76 deletions(-) diff --git a/pkg/executor/distsql.go b/pkg/executor/distsql.go index 1832913b6001e..b3db4cd3e085f 100644 --- a/pkg/executor/distsql.go +++ b/pkg/executor/distsql.go @@ -21,6 +21,7 @@ import ( "runtime/trace" "slices" "sort" + "strconv" "strings" "sync" "sync/atomic" @@ -76,6 +77,7 @@ var LookupTableTaskChannelSize int32 = 50 // lookupTableTask is created from a partial result of an index request which // contains the handles in those index keys. type lookupTableTask struct { + id int handles []kv.Handle rowIdx []int // rowIdx represents the handle index for every row. Only used when keep order. rows []chunk.Row @@ -503,6 +505,8 @@ type IndexLookUpExecutor struct { // cancelFunc is called when close the executor cancelFunc context.CancelFunc + workerCtx context.Context + pool *workerPool // If dummy flag is set, this is not a real IndexLookUpReader, it just provides the KV ranges for UnionScan. // Used by the temporary table, cached table. @@ -635,15 +639,16 @@ func (e *IndexLookUpExecutor) open(_ context.Context) error { } func (e *IndexLookUpExecutor) startWorkers(ctx context.Context, initBatchSize int) error { - // indexWorker will write to workCh and tableWorker will read from workCh, + // indexWorker will submit lookup-table tasks (processed by tableWorker) to the pool, // so fetching index and getting table data can run concurrently. - ctx, cancel := context.WithCancel(ctx) - e.cancelFunc = cancel - workCh := make(chan *lookupTableTask, 1) - if err := e.startIndexWorker(ctx, workCh, initBatchSize); err != nil { + e.workerCtx, e.cancelFunc = context.WithCancel(ctx) + e.pool = &workerPool{ + TolerablePendingTasks: 1, + MaxWorkers: int32(max(1, e.indexLookupConcurrency)), + } + if err := e.startIndexWorker(ctx, initBatchSize); err != nil { return err } - e.startTableWorker(ctx, workCh) e.workerStarted = true return nil } @@ -703,8 +708,8 @@ func (e *IndexLookUpExecutor) getRetTpsForIndexReader() []*types.FieldType { return tps } -// startIndexWorker launch a background goroutine to fetch handles, send the results to workCh. -func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan<- *lookupTableTask, initBatchSize int) error { +// startIndexWorker launch a background goroutine to fetch handles, submit lookup-table tasks to the pool. +func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, initBatchSize int) error { if e.RuntimeStats() != nil { collExec := true e.dagPB.CollectExecutionSummaries = &collExec @@ -725,11 +730,10 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< tps := e.getRetTpsForIndexReader() idxID := e.getIndexPlanRootID() e.idxWorkerWg.Add(1) - go func() { - defer trace.StartRegion(ctx, "IndexLookUpIndexWorker").End() + e.pool.submit(func() { + defer trace.StartRegion(ctx, "IndexLookUpIndexTask").End() worker := &indexWorker{ idxLookup: e, - workCh: workCh, finished: e.finished, resultCh: e.resultCh, keepOrder: e.keepOrder, @@ -805,10 +809,9 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< logutil.Logger(ctx).Error("close Select result failed", zap.Error(err)) } } - close(workCh) close(e.resultCh) e.idxWorkerWg.Done() - }() + }) return nil } @@ -841,32 +844,6 @@ func CalculateBatchSize(estRows, initBatchSize, maxBatchSize int) int { return batchSize } -// startTableWorker launches some background goroutines which pick tasks from workCh and execute the task. -func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-chan *lookupTableTask) { - lookupConcurrencyLimit := e.indexLookupConcurrency - e.tblWorkerWg.Add(lookupConcurrencyLimit) - for i := 0; i < lookupConcurrencyLimit; i++ { - workerID := i - worker := &tableWorker{ - idxLookup: e, - workCh: workCh, - finished: e.finished, - keepOrder: e.keepOrder, - handleIdx: e.handleIdx, - checkIndexValue: e.checkIndexValue, - memTracker: memory.NewTracker(workerID, -1), - } - worker.memTracker.AttachTo(e.memTracker) - ctx1, cancel := context.WithCancel(ctx) - go func() { - defer trace.StartRegion(ctx1, "IndexLookUpTableWorker").End() - worker.pickAndExecTask(ctx1) - cancel() - e.tblWorkerWg.Done() - }() - } -} - func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookupTableTask) (*TableReaderExecutor, error) { table := e.table if e.partitionTableMode && task.partitionTable != nil { @@ -1029,7 +1006,6 @@ func (e *IndexLookUpExecutor) getTableRootPlanID() int { // indexWorker is used by IndexLookUpExecutor to maintain index lookup background goroutines. type indexWorker struct { idxLookup *IndexLookUpExecutor - workCh chan<- *lookupTableTask finished <-chan struct{} resultCh chan<- *lookupTableTask keepOrder bool @@ -1056,7 +1032,7 @@ func (w *indexWorker) syncErr(err error) { } // fetchHandles fetches a batch of handles from index data and builds the index lookup tasks. -// The tasks are sent to workCh to be further processed by tableWorker, and sent to e.resultCh +// The tasks are submitted to the pool and processed by tableWorker, and sent to e.resultCh // at the same time to keep data ordered. func (w *indexWorker) fetchHandles(ctx context.Context, results []distsql.SelectResult) (err error) { defer func() { @@ -1076,6 +1052,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, results []distsql.Select w.idxLookup.stats.indexScanBasicStats = w.idxLookup.stmtRuntimeStatsColl.GetBasicRuntimeStats(idxID, true) } } + taskID := 0 for i := 0; i < len(results); { result := results[i] if w.PushedLimit != nil && w.scannedKeys >= w.PushedLimit.Count+w.PushedLimit.Offset { @@ -1093,6 +1070,8 @@ func (w *indexWorker) fetchHandles(ctx context.Context, results []distsql.Select continue } task := w.buildTableTask(handles, retChunk) + task.id = taskID + taskID++ finishBuild := time.Now() if w.idxLookup.partitionTableMode { task.partitionTable = w.idxLookup.prunedPartitions[i] @@ -1102,7 +1081,18 @@ func (w *indexWorker) fetchHandles(ctx context.Context, results []distsql.Select return nil case <-w.finished: return nil - case w.workCh <- task: + default: + e := w.idxLookup + e.tblWorkerWg.Add(1) + e.pool.submit(func() { + defer e.tblWorkerWg.Done() + select { + case <-e.finished: + return + default: + execTableTask(e, task) + } + }) w.resultCh <- task } if w.idxLookup.stats != nil { @@ -1225,10 +1215,46 @@ func (w *indexWorker) buildTableTask(handles []kv.Handle, retChk *chunk.Chunk) * return task } +func execTableTask(e *IndexLookUpExecutor, task *lookupTableTask) { + var ( + ctx = e.workerCtx + region *trace.Region + ) + if trace.IsEnabled() { + region = trace.StartRegion(ctx, "IndexLookUpTableTask"+strconv.Itoa(task.id)) + } + defer func() { + if r := recover(); r != nil { + logutil.Logger(ctx).Error("TableWorker in IndexLookUpExecutor panicked", zap.Any("recover", r), zap.Stack("stack")) + err := util.GetRecoverError(r) + task.doneCh <- err + } + if region != nil { + region.End() + } + }() + tracker := memory.NewTracker(task.id, -1) + tracker.AttachTo(e.memTracker) + w := &tableWorker{ + idxLookup: e, + finished: e.finished, + keepOrder: e.keepOrder, + handleIdx: e.handleIdx, + checkIndexValue: e.checkIndexValue, + memTracker: tracker, + } + startTime := time.Now() + err := w.executeTask(ctx, task) + if e.stats != nil { + atomic.AddInt64(&e.stats.TableRowScan, int64(time.Since(startTime))) + atomic.AddInt64(&e.stats.TableTaskNum, 1) + } + task.doneCh <- err +} + // tableWorker is used by IndexLookUpExecutor to maintain table lookup background goroutines. type tableWorker struct { idxLookup *IndexLookUpExecutor - workCh <-chan *lookupTableTask finished <-chan struct{} keepOrder bool handleIdx []int @@ -1240,39 +1266,6 @@ type tableWorker struct { *checkIndexValue } -// pickAndExecTask picks tasks from workCh, and execute them. -func (w *tableWorker) pickAndExecTask(ctx context.Context) { - var task *lookupTableTask - var ok bool - defer func() { - if r := recover(); r != nil { - logutil.Logger(ctx).Error("tableWorker in IndexLookUpExecutor panicked", zap.Any("recover", r), zap.Stack("stack")) - err := util.GetRecoverError(r) - task.doneCh <- err - } - }() - for { - // Don't check ctx.Done() on purpose. If background worker get the signal and all - // exit immediately, session's goroutine doesn't know this and still calling Next(), - // it may block reading task.doneCh forever. - select { - case task, ok = <-w.workCh: - if !ok { - return - } - case <-w.finished: - return - } - startTime := time.Now() - err := w.executeTask(ctx, task) - if w.idxLookup.stats != nil { - atomic.AddInt64(&w.idxLookup.stats.TableRowScan, int64(time.Since(startTime))) - atomic.AddInt64(&w.idxLookup.stats.TableTaskNum, 1) - } - task.doneCh <- err - } -} - func (e *IndexLookUpExecutor) getHandle(row chunk.Row, handleIdx []int, isCommonHandle bool, tp getHandleType) (handle kv.Handle, err error) { if isCommonHandle { diff --git a/pkg/executor/utils.go b/pkg/executor/utils.go index 6b650a847b58d..abd2cb7c060d7 100644 --- a/pkg/executor/utils.go +++ b/pkg/executor/utils.go @@ -16,6 +16,8 @@ package executor import ( "strings" + "sync" + "sync/atomic" "github.com/pingcap/tidb/pkg/extension" "github.com/pingcap/tidb/pkg/parser/ast" @@ -119,3 +121,70 @@ func encodePassword(u *ast.UserSpec, authPlugin *extension.AuthPlugin) (string, } return u.EncodedPassword() } + +var taskPool = sync.Pool{ + New: func() any { return &workerTask{} }, +} + +type workerTask struct { + f func() + next *workerTask +} + +type workerPool struct { + lock sync.Mutex + head *workerTask + tail *workerTask + + tasks atomic.Int32 + workers atomic.Int32 + + // TolerablePendingTasks is the number of tasks that can be tolerated in the queue, that is, the pool won't spawn a + // new goroutine if the number of tasks is less than this number. + TolerablePendingTasks int32 + // MaxWorkers is the maximum number of workers that the pool can spawn. + MaxWorkers int32 +} + +func (p *workerPool) submit(f func()) { + task := taskPool.Get().(*workerTask) + task.f, task.next = f, nil + p.lock.Lock() + if p.head == nil { + p.head = task + } else { + p.tail.next = task + } + p.tail = task + p.lock.Unlock() + tasks := p.tasks.Add(1) + + if workers := p.workers.Load(); workers == 0 || (workers < p.MaxWorkers && tasks > p.TolerablePendingTasks) { + p.workers.Add(1) + go p.run() + } +} + +func (p *workerPool) run() { + for { + var task *workerTask + + p.lock.Lock() + if p.head != nil { + task, p.head = p.head, p.head.next + if p.head == nil { + p.tail = nil + } + } + p.lock.Unlock() + + if task == nil { + p.workers.Add(-1) + return + } + p.tasks.Add(-1) + + task.f() + taskPool.Put(task) + } +} diff --git a/pkg/executor/utils_test.go b/pkg/executor/utils_test.go index de03f66e5993c..0bcfc2d84362e 100644 --- a/pkg/executor/utils_test.go +++ b/pkg/executor/utils_test.go @@ -15,6 +15,8 @@ package executor import ( + "runtime" + "sync" "testing" "github.com/pingcap/errors" @@ -177,3 +179,95 @@ func TestEncodePasswordWithPlugin(t *testing.T) { require.True(t, ok) require.Equal(t, "", pwd) } + +func TestGoPool(t *testing.T) { + var ( + list []int + lock sync.Mutex + ) + push := func(i int) { + lock.Lock() + list = append(list, i) + lock.Unlock() + } + clean := func() { + lock.Lock() + list = list[:0] + lock.Unlock() + } + + t.Run("SingleWorker", func(t *testing.T) { + clean() + pool := &workerPool{ + TolerablePendingTasks: 0, + MaxWorkers: 1, + } + wg := sync.WaitGroup{} + wg.Add(1) + pool.submit(func() { + push(1) + wg.Add(1) + pool.submit(func() { + push(3) + runtime.Gosched() + push(4) + wg.Done() + }) + runtime.Gosched() + push(2) + wg.Done() + }) + wg.Wait() + require.Equal(t, []int{1, 2, 3, 4}, list) + }) + + t.Run("TwoWorkers", func(t *testing.T) { + clean() + pool := &workerPool{ + TolerablePendingTasks: 0, + MaxWorkers: 2, + } + wg := sync.WaitGroup{} + wg.Add(1) + pool.submit(func() { + push(1) + wg.Add(1) + pool.submit(func() { + push(3) + runtime.Gosched() + push(4) + wg.Done() + }) + runtime.Gosched() + push(2) + wg.Done() + }) + wg.Wait() + require.Equal(t, []int{1, 3, 2, 4}, list) + }) + + t.Run("TolerateOnePendingTask", func(t *testing.T) { + clean() + pool := &workerPool{ + TolerablePendingTasks: 1, + MaxWorkers: 2, + } + wg := sync.WaitGroup{} + wg.Add(1) + pool.submit(func() { + push(1) + wg.Add(1) + pool.submit(func() { + push(3) + runtime.Gosched() + push(4) + wg.Done() + }) + runtime.Gosched() + push(2) + wg.Done() + }) + wg.Wait() + require.Equal(t, []int{1, 2, 3, 4}, list) + }) +}