From df938cd4879a6ca8eb7486df61f8ad6fea521bee Mon Sep 17 00:00:00 2001 From: zyguan <zhongyangguan@gmail.com> Date: Thu, 5 Dec 2024 13:28:44 +0000 Subject: [PATCH 1/5] executor: run idxlookup workers in a pool Signed-off-by: zyguan <zhongyangguan@gmail.com> --- pkg/executor/distsql.go | 138 ++++++++++++++++++---------------------- pkg/executor/utils.go | 66 +++++++++++++++++++ 2 files changed, 129 insertions(+), 75 deletions(-) diff --git a/pkg/executor/distsql.go b/pkg/executor/distsql.go index 38d288555caf1..2cc83a6724aa9 100644 --- a/pkg/executor/distsql.go +++ b/pkg/executor/distsql.go @@ -503,6 +503,8 @@ type IndexLookUpExecutor struct { // cancelFunc is called when close the executor cancelFunc context.CancelFunc + workerCtx context.Context + pool gopool // If dummy flag is set, this is not a real IndexLookUpReader, it just provides the KV ranges for UnionScan. // Used by the temporary table, cached table. @@ -635,15 +637,14 @@ func (e *IndexLookUpExecutor) open(_ context.Context) error { } func (e *IndexLookUpExecutor) startWorkers(ctx context.Context, initBatchSize int) error { - // indexWorker will write to workCh and tableWorker will read from workCh, + // indexWorker will submit lookup-table tasks (processed by tableWorker) to the pool, // so fetching index and getting table data can run concurrently. - ctx, cancel := context.WithCancel(ctx) - e.cancelFunc = cancel - workCh := make(chan *lookupTableTask, 1) - if err := e.startIndexWorker(ctx, workCh, initBatchSize); err != nil { + e.workerCtx, e.cancelFunc = context.WithCancel(ctx) + e.pool.TolerablePendingTasks = 1 + e.pool.MaxWorkers = int32(max(1, e.indexLookupConcurrency)) + if err := e.startIndexWorker(ctx, initBatchSize); err != nil { return err } - e.startTableWorker(ctx, workCh) e.workerStarted = true return nil } @@ -703,8 +704,8 @@ func (e *IndexLookUpExecutor) getRetTpsForIndexReader() []*types.FieldType { return tps } -// startIndexWorker launch a background goroutine to fetch handles, send the results to workCh. -func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan<- *lookupTableTask, initBatchSize int) error { +// startIndexWorker launch a background goroutine to fetch handles, submit lookup-table tasks to the pool. +func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, initBatchSize int) error { if e.RuntimeStats() != nil { collExec := true e.dagPB.CollectExecutionSummaries = &collExec @@ -725,11 +726,10 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< tps := e.getRetTpsForIndexReader() idxID := e.getIndexPlanRootID() e.idxWorkerWg.Add(1) - go func() { + e.pool.submit(func() { defer trace.StartRegion(ctx, "IndexLookUpIndexWorker").End() worker := &indexWorker{ idxLookup: e, - workCh: workCh, finished: e.finished, resultCh: e.resultCh, keepOrder: e.keepOrder, @@ -805,10 +805,9 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< logutil.Logger(ctx).Error("close Select result failed", zap.Error(err)) } } - close(workCh) close(e.resultCh) e.idxWorkerWg.Done() - }() + }) return nil } @@ -841,32 +840,6 @@ func CalculateBatchSize(estRows, initBatchSize, maxBatchSize int) int { return batchSize } -// startTableWorker launches some background goroutines which pick tasks from workCh and execute the task. -func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-chan *lookupTableTask) { - lookupConcurrencyLimit := e.indexLookupConcurrency - e.tblWorkerWg.Add(lookupConcurrencyLimit) - for i := 0; i < lookupConcurrencyLimit; i++ { - workerID := i - worker := &tableWorker{ - idxLookup: e, - workCh: workCh, - finished: e.finished, - keepOrder: e.keepOrder, - handleIdx: e.handleIdx, - checkIndexValue: e.checkIndexValue, - memTracker: memory.NewTracker(workerID, -1), - } - worker.memTracker.AttachTo(e.memTracker) - ctx1, cancel := context.WithCancel(ctx) - go func() { - defer trace.StartRegion(ctx1, "IndexLookUpTableWorker").End() - worker.pickAndExecTask(ctx1) - cancel() - e.tblWorkerWg.Done() - }() - } -} - func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookupTableTask) (*TableReaderExecutor, error) { table := e.table if e.partitionTableMode && task.partitionTable != nil { @@ -1029,7 +1002,6 @@ func (e *IndexLookUpExecutor) getTableRootPlanID() int { // indexWorker is used by IndexLookUpExecutor to maintain index lookup background goroutines. type indexWorker struct { idxLookup *IndexLookUpExecutor - workCh chan<- *lookupTableTask finished <-chan struct{} resultCh chan<- *lookupTableTask keepOrder bool @@ -1056,7 +1028,7 @@ func (w *indexWorker) syncErr(err error) { } // fetchHandles fetches a batch of handles from index data and builds the index lookup tasks. -// The tasks are sent to workCh to be further processed by tableWorker, and sent to e.resultCh +// The tasks are submitted to the pool and processed by tableWorker, and sent to e.resultCh // at the same time to keep data ordered. func (w *indexWorker) fetchHandles(ctx context.Context, results []distsql.SelectResult) (err error) { defer func() { @@ -1076,6 +1048,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, results []distsql.Select w.idxLookup.stats.indexScanBasicStats = w.idxLookup.stmtRuntimeStatsColl.GetBasicRuntimeStats(idxID, true) } } + taskID := 0 for i := 0; i < len(results); { result := results[i] if w.PushedLimit != nil && w.scannedKeys >= w.PushedLimit.Count+w.PushedLimit.Offset { @@ -1093,6 +1066,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, results []distsql.Select continue } task := w.buildTableTask(handles, retChunk) + taskID++ finishBuild := time.Now() if w.idxLookup.partitionTableMode { task.partitionTable = w.idxLookup.prunedPartitions[i] @@ -1102,7 +1076,20 @@ func (w *indexWorker) fetchHandles(ctx context.Context, results []distsql.Select return nil case <-w.finished: return nil - case w.workCh <- task: + default: + e := w.idxLookup + e.tblWorkerWg.Add(1) + e.pool.submit(func() { + defer e.tblWorkerWg.Done() + select { + case <-e.finished: + return + default: + tracker := memory.NewTracker(taskID, -1) + tracker.AttachTo(e.memTracker) + execTableTask(e, task, tracker) + } + }) w.resultCh <- task } if w.idxLookup.stats != nil { @@ -1223,10 +1210,44 @@ func (w *indexWorker) buildTableTask(handles []kv.Handle, retChk *chunk.Chunk) * return task } +func execTableTask(e *IndexLookUpExecutor, task *lookupTableTask, tracker *memory.Tracker) { + var ( + ctx = e.workerCtx + region *trace.Region + ) + if trace.IsEnabled() { + region = trace.StartRegion(ctx, "IndexLookUpTableTask") + } + defer func() { + if r := recover(); r != nil { + logutil.Logger(ctx).Error("TableWorker in IndexLookUpExecutor panicked", zap.Any("recover", r), zap.Stack("stack")) + err := util.GetRecoverError(r) + task.doneCh <- err + } + if region != nil { + region.End() + } + }() + w := &tableWorker{ + idxLookup: e, + finished: e.finished, + keepOrder: e.keepOrder, + handleIdx: e.handleIdx, + checkIndexValue: e.checkIndexValue, + memTracker: tracker, + } + startTime := time.Now() + err := w.executeTask(ctx, task) + if e.stats != nil { + atomic.AddInt64(&e.stats.TableRowScan, int64(time.Since(startTime))) + atomic.AddInt64(&e.stats.TableTaskNum, 1) + } + task.doneCh <- err +} + // tableWorker is used by IndexLookUpExecutor to maintain table lookup background goroutines. type tableWorker struct { idxLookup *IndexLookUpExecutor - workCh <-chan *lookupTableTask finished <-chan struct{} keepOrder bool handleIdx []int @@ -1238,39 +1259,6 @@ type tableWorker struct { *checkIndexValue } -// pickAndExecTask picks tasks from workCh, and execute them. -func (w *tableWorker) pickAndExecTask(ctx context.Context) { - var task *lookupTableTask - var ok bool - defer func() { - if r := recover(); r != nil { - logutil.Logger(ctx).Error("tableWorker in IndexLookUpExecutor panicked", zap.Any("recover", r), zap.Stack("stack")) - err := util.GetRecoverError(r) - task.doneCh <- err - } - }() - for { - // Don't check ctx.Done() on purpose. If background worker get the signal and all - // exit immediately, session's goroutine doesn't know this and still calling Next(), - // it may block reading task.doneCh forever. - select { - case task, ok = <-w.workCh: - if !ok { - return - } - case <-w.finished: - return - } - startTime := time.Now() - err := w.executeTask(ctx, task) - if w.idxLookup.stats != nil { - atomic.AddInt64(&w.idxLookup.stats.TableRowScan, int64(time.Since(startTime))) - atomic.AddInt64(&w.idxLookup.stats.TableTaskNum, 1) - } - task.doneCh <- err - } -} - func (e *IndexLookUpExecutor) getHandle(row chunk.Row, handleIdx []int, isCommonHandle bool, tp getHandleType) (handle kv.Handle, err error) { if isCommonHandle { diff --git a/pkg/executor/utils.go b/pkg/executor/utils.go index 6b650a847b58d..9cd1ddbdb85ab 100644 --- a/pkg/executor/utils.go +++ b/pkg/executor/utils.go @@ -16,6 +16,8 @@ package executor import ( "strings" + "sync" + "sync/atomic" "github.com/pingcap/tidb/pkg/extension" "github.com/pingcap/tidb/pkg/parser/ast" @@ -119,3 +121,67 @@ func encodePassword(u *ast.UserSpec, authPlugin *extension.AuthPlugin) (string, } return u.EncodedPassword() } + +var taskPool = sync.Pool{ + New: func() any { return &gotask{} }, +} + +type gotask struct { + f func() + next *gotask +} + +type gopool struct { + lock sync.Mutex + head *gotask + tail *gotask + + tasks atomic.Int32 + workers atomic.Int32 + + TolerablePendingTasks int32 + MaxWorkers int32 +} + +func (p *gopool) submit(f func()) { + task := taskPool.Get().(*gotask) + task.f, task.next = f, nil + p.lock.Lock() + if p.head == nil { + p.head = task + } else { + p.tail.next = task + } + p.tail = task + p.lock.Unlock() + tasks := p.tasks.Add(1) + + if workers := p.workers.Load(); workers == 0 || (workers < p.MaxWorkers && tasks > p.TolerablePendingTasks) { + p.workers.Add(1) + go p.run() + } +} + +func (p *gopool) run() { + for { + var task *gotask + + p.lock.Lock() + if p.head != nil { + task, p.head = p.head, p.head.next + if p.head == nil { + p.tail = nil + } + } + p.lock.Unlock() + + if task == nil { + p.workers.Add(-1) + return + } + p.tasks.Add(-1) + + task.f() + taskPool.Put(task) + } +} From a6d43070553b2486144fb509e68d352887659a21 Mon Sep 17 00:00:00 2001 From: zyguan <zhongyangguan@gmail.com> Date: Fri, 6 Dec 2024 08:15:47 +0000 Subject: [PATCH 2/5] executor: add basic ut for gopool Signed-off-by: zyguan <zhongyangguan@gmail.com> --- pkg/executor/distsql.go | 2 +- pkg/executor/utils_test.go | 94 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+), 1 deletion(-) diff --git a/pkg/executor/distsql.go b/pkg/executor/distsql.go index 2cc83a6724aa9..ceac4e84005a4 100644 --- a/pkg/executor/distsql.go +++ b/pkg/executor/distsql.go @@ -727,7 +727,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, initBatchSiz idxID := e.getIndexPlanRootID() e.idxWorkerWg.Add(1) e.pool.submit(func() { - defer trace.StartRegion(ctx, "IndexLookUpIndexWorker").End() + defer trace.StartRegion(ctx, "IndexLookUpIndexTask").End() worker := &indexWorker{ idxLookup: e, finished: e.finished, diff --git a/pkg/executor/utils_test.go b/pkg/executor/utils_test.go index de03f66e5993c..77de775426293 100644 --- a/pkg/executor/utils_test.go +++ b/pkg/executor/utils_test.go @@ -15,6 +15,8 @@ package executor import ( + "runtime" + "sync" "testing" "github.com/pingcap/errors" @@ -177,3 +179,95 @@ func TestEncodePasswordWithPlugin(t *testing.T) { require.True(t, ok) require.Equal(t, "", pwd) } + +func TestGoPool(t *testing.T) { + var ( + list []int + lock sync.Mutex + ) + push := func(i int) { + lock.Lock() + list = append(list, i) + lock.Unlock() + } + clear := func() { + lock.Lock() + list = list[:0] + lock.Unlock() + } + + t.Run("SingleWorker", func(t *testing.T) { + clear() + pool := &gopool{ + TolerablePendingTasks: 0, + MaxWorkers: 1, + } + wg := sync.WaitGroup{} + wg.Add(1) + pool.submit(func() { + push(1) + wg.Add(1) + pool.submit(func() { + push(3) + runtime.Gosched() + push(4) + wg.Done() + }) + runtime.Gosched() + push(2) + wg.Done() + }) + wg.Wait() + require.Equal(t, []int{1, 2, 3, 4}, list) + }) + + t.Run("TwoWorkers", func(t *testing.T) { + clear() + pool := &gopool{ + TolerablePendingTasks: 0, + MaxWorkers: 2, + } + wg := sync.WaitGroup{} + wg.Add(1) + pool.submit(func() { + push(1) + wg.Add(1) + pool.submit(func() { + push(3) + runtime.Gosched() + push(4) + wg.Done() + }) + runtime.Gosched() + push(2) + wg.Done() + }) + wg.Wait() + require.Equal(t, []int{1, 3, 2, 4}, list) + }) + + t.Run("TolerateOnePendingTask", func(t *testing.T) { + clear() + pool := &gopool{ + TolerablePendingTasks: 1, + MaxWorkers: 2, + } + wg := sync.WaitGroup{} + wg.Add(1) + pool.submit(func() { + push(1) + wg.Add(1) + pool.submit(func() { + push(3) + runtime.Gosched() + push(4) + wg.Done() + }) + runtime.Gosched() + push(2) + wg.Done() + }) + wg.Wait() + require.Equal(t, []int{1, 2, 3, 4}, list) + }) +} From a413155cde271ce4f4fc996875196d003ea60f53 Mon Sep 17 00:00:00 2001 From: zyguan <zhongyangguan@gmail.com> Date: Fri, 6 Dec 2024 09:02:47 +0000 Subject: [PATCH 3/5] fix bazel build Signed-off-by: zyguan <zhongyangguan@gmail.com> --- pkg/executor/distsql.go | 8 +++++--- pkg/executor/utils.go | 5 ++++- pkg/executor/utils_test.go | 8 ++++---- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/pkg/executor/distsql.go b/pkg/executor/distsql.go index ceac4e84005a4..b71aa5cb3da4b 100644 --- a/pkg/executor/distsql.go +++ b/pkg/executor/distsql.go @@ -504,7 +504,7 @@ type IndexLookUpExecutor struct { // cancelFunc is called when close the executor cancelFunc context.CancelFunc workerCtx context.Context - pool gopool + pool *gopool // If dummy flag is set, this is not a real IndexLookUpReader, it just provides the KV ranges for UnionScan. // Used by the temporary table, cached table. @@ -640,8 +640,10 @@ func (e *IndexLookUpExecutor) startWorkers(ctx context.Context, initBatchSize in // indexWorker will submit lookup-table tasks (processed by tableWorker) to the pool, // so fetching index and getting table data can run concurrently. e.workerCtx, e.cancelFunc = context.WithCancel(ctx) - e.pool.TolerablePendingTasks = 1 - e.pool.MaxWorkers = int32(max(1, e.indexLookupConcurrency)) + e.pool = &gopool{ + TolerablePendingTasks: 1, + MaxWorkers: int32(max(1, e.indexLookupConcurrency)), + } if err := e.startIndexWorker(ctx, initBatchSize); err != nil { return err } diff --git a/pkg/executor/utils.go b/pkg/executor/utils.go index 9cd1ddbdb85ab..1cc0250ac970a 100644 --- a/pkg/executor/utils.go +++ b/pkg/executor/utils.go @@ -139,8 +139,11 @@ type gopool struct { tasks atomic.Int32 workers atomic.Int32 + // TolerablePendingTasks is the number of tasks that can be tolerated in the queue, that is, the pool won't spawn a + // new goroutine if the number of tasks is less than this number. TolerablePendingTasks int32 - MaxWorkers int32 + // MaxWorkers is the maximum number of workers that the pool can spawn. + MaxWorkers int32 } func (p *gopool) submit(f func()) { diff --git a/pkg/executor/utils_test.go b/pkg/executor/utils_test.go index 77de775426293..66a35335c9146 100644 --- a/pkg/executor/utils_test.go +++ b/pkg/executor/utils_test.go @@ -190,14 +190,14 @@ func TestGoPool(t *testing.T) { list = append(list, i) lock.Unlock() } - clear := func() { + clean := func() { lock.Lock() list = list[:0] lock.Unlock() } t.Run("SingleWorker", func(t *testing.T) { - clear() + clean() pool := &gopool{ TolerablePendingTasks: 0, MaxWorkers: 1, @@ -222,7 +222,7 @@ func TestGoPool(t *testing.T) { }) t.Run("TwoWorkers", func(t *testing.T) { - clear() + clean() pool := &gopool{ TolerablePendingTasks: 0, MaxWorkers: 2, @@ -247,7 +247,7 @@ func TestGoPool(t *testing.T) { }) t.Run("TolerateOnePendingTask", func(t *testing.T) { - clear() + clean() pool := &gopool{ TolerablePendingTasks: 1, MaxWorkers: 2, From d3d3825afd3bcf294031cb5150abc91662b53f89 Mon Sep 17 00:00:00 2001 From: zyguan <zhongyangguan@gmail.com> Date: Fri, 6 Dec 2024 14:32:41 +0000 Subject: [PATCH 4/5] fix data race Signed-off-by: zyguan <zhongyangguan@gmail.com> --- pkg/executor/distsql.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/pkg/executor/distsql.go b/pkg/executor/distsql.go index b71aa5cb3da4b..d7bc442f3019d 100644 --- a/pkg/executor/distsql.go +++ b/pkg/executor/distsql.go @@ -21,6 +21,7 @@ import ( "runtime/trace" "slices" "sort" + "strconv" "strings" "sync" "sync/atomic" @@ -76,6 +77,7 @@ var LookupTableTaskChannelSize int32 = 50 // lookupTableTask is created from a partial result of an index request which // contains the handles in those index keys. type lookupTableTask struct { + id int handles []kv.Handle rowIdx []int // rowIdx represents the handle index for every row. Only used when keep order. rows []chunk.Row @@ -1068,6 +1070,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, results []distsql.Select continue } task := w.buildTableTask(handles, retChunk) + task.id = taskID taskID++ finishBuild := time.Now() if w.idxLookup.partitionTableMode { @@ -1087,9 +1090,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, results []distsql.Select case <-e.finished: return default: - tracker := memory.NewTracker(taskID, -1) - tracker.AttachTo(e.memTracker) - execTableTask(e, task, tracker) + execTableTask(e, task) } }) w.resultCh <- task @@ -1212,13 +1213,13 @@ func (w *indexWorker) buildTableTask(handles []kv.Handle, retChk *chunk.Chunk) * return task } -func execTableTask(e *IndexLookUpExecutor, task *lookupTableTask, tracker *memory.Tracker) { +func execTableTask(e *IndexLookUpExecutor, task *lookupTableTask) { var ( ctx = e.workerCtx region *trace.Region ) if trace.IsEnabled() { - region = trace.StartRegion(ctx, "IndexLookUpTableTask") + region = trace.StartRegion(ctx, "IndexLookUpTableTask"+strconv.Itoa(task.id)) } defer func() { if r := recover(); r != nil { @@ -1230,6 +1231,8 @@ func execTableTask(e *IndexLookUpExecutor, task *lookupTableTask, tracker *memor region.End() } }() + tracker := memory.NewTracker(task.id, -1) + tracker.AttachTo(e.memTracker) w := &tableWorker{ idxLookup: e, finished: e.finished, From 954be6bc7a3d468cbe798298507f455fa9a80bb7 Mon Sep 17 00:00:00 2001 From: zyguan <zhongyangguan@gmail.com> Date: Fri, 13 Dec 2024 01:12:19 +0000 Subject: [PATCH 5/5] address https://github.com/pingcap/tidb/pull/58033#discussion_r1881553360 Signed-off-by: zyguan <zhongyangguan@gmail.com> --- pkg/executor/distsql.go | 4 ++-- pkg/executor/utils.go | 20 ++++++++++---------- pkg/executor/utils_test.go | 6 +++--- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/pkg/executor/distsql.go b/pkg/executor/distsql.go index d7bc442f3019d..6b6bdbbb2b030 100644 --- a/pkg/executor/distsql.go +++ b/pkg/executor/distsql.go @@ -506,7 +506,7 @@ type IndexLookUpExecutor struct { // cancelFunc is called when close the executor cancelFunc context.CancelFunc workerCtx context.Context - pool *gopool + pool *workerPool // If dummy flag is set, this is not a real IndexLookUpReader, it just provides the KV ranges for UnionScan. // Used by the temporary table, cached table. @@ -642,7 +642,7 @@ func (e *IndexLookUpExecutor) startWorkers(ctx context.Context, initBatchSize in // indexWorker will submit lookup-table tasks (processed by tableWorker) to the pool, // so fetching index and getting table data can run concurrently. e.workerCtx, e.cancelFunc = context.WithCancel(ctx) - e.pool = &gopool{ + e.pool = &workerPool{ TolerablePendingTasks: 1, MaxWorkers: int32(max(1, e.indexLookupConcurrency)), } diff --git a/pkg/executor/utils.go b/pkg/executor/utils.go index 1cc0250ac970a..abd2cb7c060d7 100644 --- a/pkg/executor/utils.go +++ b/pkg/executor/utils.go @@ -123,18 +123,18 @@ func encodePassword(u *ast.UserSpec, authPlugin *extension.AuthPlugin) (string, } var taskPool = sync.Pool{ - New: func() any { return &gotask{} }, + New: func() any { return &workerTask{} }, } -type gotask struct { +type workerTask struct { f func() - next *gotask + next *workerTask } -type gopool struct { +type workerPool struct { lock sync.Mutex - head *gotask - tail *gotask + head *workerTask + tail *workerTask tasks atomic.Int32 workers atomic.Int32 @@ -146,8 +146,8 @@ type gopool struct { MaxWorkers int32 } -func (p *gopool) submit(f func()) { - task := taskPool.Get().(*gotask) +func (p *workerPool) submit(f func()) { + task := taskPool.Get().(*workerTask) task.f, task.next = f, nil p.lock.Lock() if p.head == nil { @@ -165,9 +165,9 @@ func (p *gopool) submit(f func()) { } } -func (p *gopool) run() { +func (p *workerPool) run() { for { - var task *gotask + var task *workerTask p.lock.Lock() if p.head != nil { diff --git a/pkg/executor/utils_test.go b/pkg/executor/utils_test.go index 66a35335c9146..0bcfc2d84362e 100644 --- a/pkg/executor/utils_test.go +++ b/pkg/executor/utils_test.go @@ -198,7 +198,7 @@ func TestGoPool(t *testing.T) { t.Run("SingleWorker", func(t *testing.T) { clean() - pool := &gopool{ + pool := &workerPool{ TolerablePendingTasks: 0, MaxWorkers: 1, } @@ -223,7 +223,7 @@ func TestGoPool(t *testing.T) { t.Run("TwoWorkers", func(t *testing.T) { clean() - pool := &gopool{ + pool := &workerPool{ TolerablePendingTasks: 0, MaxWorkers: 2, } @@ -248,7 +248,7 @@ func TestGoPool(t *testing.T) { t.Run("TolerateOnePendingTask", func(t *testing.T) { clean() - pool := &gopool{ + pool := &workerPool{ TolerablePendingTasks: 1, MaxWorkers: 2, }