diff --git a/pool.go b/pool.go index 89ad35d7..ddd1fc34 100644 --- a/pool.go +++ b/pool.go @@ -56,8 +56,8 @@ type Pool struct { // workerCache speeds up the obtainment of a usable worker in function:retrieveWorker. workerCache sync.Pool - // blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock - blockingNum int + // waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lock + waiting int32 heartbeatDone int32 stopHeartbeat context.CancelFunc @@ -97,10 +97,11 @@ func (p *Pool) purgePeriodically(ctx context.Context) { expiredWorkers[i] = nil } - // There might be a situation that all workers have been cleaned up(no any worker is running) + // There might be a situation where all workers have been cleaned up(no worker is running), + // or another case where the pool capacity has been Tuned up, // while some invokers still get stuck in "p.cond.Wait()", // then it ought to wake all those invokers. - if p.Running() == 0 { + if p.Running() == 0 || (p.Waiting() > 0 && p.Free() > 0) { p.cond.Broadcast() } } @@ -174,12 +175,12 @@ func (p *Pool) Submit(task func()) error { return nil } -// Running returns the amount of the currently running goroutines. +// Running returns the number of workers currently running. func (p *Pool) Running() int { return int(atomic.LoadInt32(&p.running)) } -// Free returns the amount of available goroutines to work, -1 indicates this pool is unlimited. +// Free returns the number of available goroutines to work, -1 indicates this pool is unlimited. func (p *Pool) Free() int { c := p.Cap() if c < 0 { @@ -188,6 +189,11 @@ func (p *Pool) Free() int { return c - p.Running() } +// Waiting returns the number of tasks which are waiting be executed. +func (p *Pool) Waiting() int { + return int(atomic.LoadInt32(&p.waiting)) +} + // Cap returns the capacity of this pool. func (p *Pool) Cap() int { return int(atomic.LoadInt32(&p.capacity)) @@ -259,14 +265,12 @@ func (p *Pool) Reboot() { // --------------------------------------------------------------------------- -// incRunning increases the number of the currently running goroutines. -func (p *Pool) incRunning() { - atomic.AddInt32(&p.running, 1) +func (p *Pool) addRunning(delta int) { + atomic.AddInt32(&p.running, int32(delta)) } -// decRunning decreases the number of the currently running goroutines. -func (p *Pool) decRunning() { - atomic.AddInt32(&p.running, -1) +func (p *Pool) addWaiting(delta int) { + atomic.AddInt32(&p.waiting, int32(delta)) } // retrieveWorker returns an available worker to run the tasks. @@ -292,30 +296,33 @@ func (p *Pool) retrieveWorker() (w *goWorker) { return } retry: - if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks { + if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks { p.lock.Unlock() return } - p.blockingNum++ + p.addWaiting(1) p.cond.Wait() // block and wait for an available worker - p.blockingNum-- + p.addWaiting(-1) + + if p.IsClosed() { + p.lock.Unlock() + return + } + var nw int if nw = p.Running(); nw == 0 { // awakened by the scavenger p.lock.Unlock() - if !p.IsClosed() { - spawnWorker() - } + spawnWorker() return } if w = p.workers.detach(); w == nil { - if nw < capacity { + if nw < p.Cap() { p.lock.Unlock() spawnWorker() return } goto retry } - p.lock.Unlock() } return diff --git a/pool_func.go b/pool_func.go index bfa4ccb4..3b5d5a76 100644 --- a/pool_func.go +++ b/pool_func.go @@ -58,8 +58,8 @@ type PoolWithFunc struct { // workerCache speeds up the obtainment of a usable worker in function:retrieveWorker. workerCache sync.Pool - // blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock - blockingNum int + // waiting is the number of the goroutines already been blocked on pool.Invoke(), protected by pool.lock + waiting int32 heartbeatDone int32 stopHeartbeat context.CancelFunc @@ -112,10 +112,11 @@ func (p *PoolWithFunc) purgePeriodically(ctx context.Context) { expiredWorkers[i] = nil } - // There might be a situation that all workers have been cleaned up(no worker is running) + // There might be a situation where all workers have been cleaned up(no worker is running), + // or another case where the pool capacity has been Tuned up, // while some invokers still get stuck in "p.cond.Wait()", // then it ought to wake all those invokers. - if p.Running() == 0 { + if p.Running() == 0 || (p.Waiting() > 0 && p.Free() > 0) { p.cond.Broadcast() } } @@ -191,12 +192,12 @@ func (p *PoolWithFunc) Invoke(args interface{}) error { return nil } -// Running returns the amount of the currently running goroutines. +// Running returns the number of workers currently running. func (p *PoolWithFunc) Running() int { return int(atomic.LoadInt32(&p.running)) } -// Free returns the amount of available goroutines to work, -1 indicates this pool is unlimited. +// Free returns the number of available goroutines to work, -1 indicates this pool is unlimited. func (p *PoolWithFunc) Free() int { c := p.Cap() if c < 0 { @@ -205,6 +206,11 @@ func (p *PoolWithFunc) Free() int { return c - p.Running() } +// Waiting returns the number of tasks which are waiting be executed. +func (p *PoolWithFunc) Waiting() int { + return int(atomic.LoadInt32(&p.waiting)) +} + // Cap returns the capacity of this pool. func (p *PoolWithFunc) Cap() int { return int(atomic.LoadInt32(&p.capacity)) @@ -280,14 +286,12 @@ func (p *PoolWithFunc) Reboot() { //--------------------------------------------------------------------------- -// incRunning increases the number of the currently running goroutines. -func (p *PoolWithFunc) incRunning() { - atomic.AddInt32(&p.running, 1) +func (p *PoolWithFunc) addRunning(delta int) { + atomic.AddInt32(&p.running, int32(delta)) } -// decRunning decreases the number of the currently running goroutines. -func (p *PoolWithFunc) decRunning() { - atomic.AddInt32(&p.running, -1) +func (p *PoolWithFunc) addWaiting(delta int) { + atomic.AddInt32(&p.waiting, int32(delta)) } // retrieveWorker returns an available worker to run the tasks. @@ -316,24 +320,28 @@ func (p *PoolWithFunc) retrieveWorker() (w *goWorkerWithFunc) { return } retry: - if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks { + if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks { p.lock.Unlock() return } - p.blockingNum++ + p.addWaiting(1) p.cond.Wait() // block and wait for an available worker - p.blockingNum-- + p.addWaiting(-1) + + if p.IsClosed() { + p.lock.Unlock() + return + } + var nw int if nw = p.Running(); nw == 0 { // awakened by the scavenger p.lock.Unlock() - if !p.IsClosed() { - spawnWorker() - } + spawnWorker() return } l := len(p.workers) - 1 if l < 0 { - if nw < capacity { + if nw < p.Cap() { p.lock.Unlock() spawnWorker() return diff --git a/worker.go b/worker.go index b804e9f9..a2bee572 100644 --- a/worker.go +++ b/worker.go @@ -44,10 +44,10 @@ type goWorker struct { // run starts a goroutine to repeat the process // that performs the function calls. func (w *goWorker) run() { - w.pool.incRunning() + w.pool.addRunning(1) go func() { defer func() { - w.pool.decRunning() + w.pool.addRunning(-1) w.pool.workerCache.Put(w) if p := recover(); p != nil { if ph := w.pool.options.PanicHandler; ph != nil { diff --git a/worker_func.go b/worker_func.go index 9fe35e95..d15224d8 100644 --- a/worker_func.go +++ b/worker_func.go @@ -44,10 +44,10 @@ type goWorkerWithFunc struct { // run starts a goroutine to repeat the process // that performs the function calls. func (w *goWorkerWithFunc) run() { - w.pool.incRunning() + w.pool.addRunning(1) go func() { defer func() { - w.pool.decRunning() + w.pool.addRunning(-1) w.pool.workerCache.Put(w) if p := recover(); p != nil { if ph := w.pool.options.PanicHandler; ph != nil { diff --git a/worker_loop_queue_test.go b/worker_loop_queue_test.go index f0c0b5f3..1a2e17c1 100644 --- a/worker_loop_queue_test.go +++ b/worker_loop_queue_test.go @@ -1,3 +1,4 @@ +//go:build !windows // +build !windows package ants diff --git a/worker_stack_test.go b/worker_stack_test.go index 1bddf9d6..94b6055c 100644 --- a/worker_stack_test.go +++ b/worker_stack_test.go @@ -1,3 +1,4 @@ +//go:build !windows // +build !windows package ants