From 1d93c12146e1dd42a758f1fcb95d5453890f40dc Mon Sep 17 00:00:00 2001 From: Alejandro Durante Date: Fri, 15 Nov 2024 20:19:17 -0300 Subject: [PATCH] fix(dispatcher): ensure workers exit reliably when worker count is low --- .github/workflows/main.yml | 2 +- pool.go | 21 +++++++++++---------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index e9e5e6b..360295f 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -11,7 +11,7 @@ jobs: strategy: matrix: go-version: [1.22.x, 1.21.x, 1.20.x] - os: [ubuntu-latest, macos-latest] + os: [ubuntu-latest, macos-latest, windows-latest] runs-on: ${{ matrix.os }} steps: - name: Checkout code diff --git a/pool.go b/pool.go index 42be075..b93dade 100644 --- a/pool.go +++ b/pool.go @@ -12,9 +12,9 @@ import ( "github.com/alitto/pond/v2/internal/future" ) -var MAX_TASKS_CHAN_LENGTH = runtime.NumCPU() * 128 +var NUM_CPU = runtime.NumCPU() -var PERSISTENT_WORKER_COUNT = int64(runtime.NumCPU()) +var MAX_TASKS_CHAN_LENGTH = NUM_CPU * 128 var ErrPoolStopped = errors.New("pool stopped") @@ -241,14 +241,13 @@ func (p *pool) startWorker(limit int) { return } p.workerWaitGroup.Add(1) - workerNumber := p.workerCount.Add(1) - // Guarantee at least PERSISTENT_WORKER_COUNT workers are always running during dispatch to prevent deadlocks - canExitDuringDispatch := workerNumber > PERSISTENT_WORKER_COUNT - go p.worker(canExitDuringDispatch) + p.workerCount.Add(1) + go p.worker() } -func (p *pool) workerCanExit(canExitDuringDispatch bool) bool { - if canExitDuringDispatch { +func (p *pool) workerCanExit() bool { + if int(p.workerCount.Load()) > NUM_CPU { + // If there are more workers than CPUs, then we can trust workerCount to be at least 1 p.workerCount.Add(-1) return true } @@ -256,8 +255,10 @@ func (p *pool) workerCanExit(canExitDuringDispatch bool) bool { // Check if the dispatcher is running if !p.dispatcherRunning.TryLock() { // Dispatcher is running, cannot exit yet + runtime.Gosched() return false } + if len(p.tasks) > 0 { // There are tasks in the queue, cannot exit yet p.dispatcherRunning.Unlock() @@ -269,7 +270,7 @@ func (p *pool) workerCanExit(canExitDuringDispatch bool) bool { return true } -func (p *pool) worker(canExitDuringDispatch bool) { +func (p *pool) worker() { defer func() { p.workerWaitGroup.Done() }() @@ -306,7 +307,7 @@ func (p *pool) worker(canExitDuringDispatch bool) { // No tasks left // Check if the worker can exit - if p.workerCanExit(canExitDuringDispatch) { + if p.workerCanExit() { return } continue