From 2ac1891be4dcfd1e95b7074a3621e14ac0eeedbe Mon Sep 17 00:00:00 2001 From: POABOB Date: Sun, 17 Sep 2023 15:55:14 +0800 Subject: [PATCH 1/7] style: make func of retrieveWorker() more readable. --- pool.go | 56 +++++++++++++++++++++++++++----------------------------- 1 file changed, 27 insertions(+), 29 deletions(-) diff --git a/pool.go b/pool.go index b0481bb9..742fc59d 100644 --- a/pool.go +++ b/pool.go @@ -339,45 +339,43 @@ func (p *Pool) retrieveWorker() (w worker) { } p.lock.Lock() - w = p.workers.detach() - if w != nil { // first try to fetch the worker from the queue + +retry: + if w = p.workers.detach(); w != nil { + // first try to fetch the worker from the queue p.lock.Unlock() - } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { + return + } + + if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { // if the worker queue is empty and we don't run out of the pool capacity, // then just spawn a new worker goroutine. p.lock.Unlock() spawnWorker() - } else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool. - if p.options.Nonblocking { - p.lock.Unlock() - return - } - retry: - if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks { - p.lock.Unlock() - return - } + return + } + + // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool. + if p.options.Nonblocking { + p.lock.Unlock() + return + } - p.addWaiting(1) - p.cond.Wait() // block and wait for an available worker - p.addWaiting(-1) + if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks { + p.lock.Unlock() + return + } - if p.IsClosed() { - p.lock.Unlock() - return - } + p.addWaiting(1) + p.cond.Wait() // block and wait for an available worker + p.addWaiting(-1) - if w = p.workers.detach(); w == nil { - if p.Free() > 0 { - p.lock.Unlock() - spawnWorker() - return - } - goto retry - } + if p.IsClosed() { p.lock.Unlock() + return } - return + + goto retry } // revertWorker puts a worker back into free pool, recycling the goroutines. From 2cf6df5c9da1a5283f8783b61cbc3264d44119f4 Mon Sep 17 00:00:00 2001 From: POABOB Date: Sun, 17 Sep 2023 20:51:32 +0800 Subject: [PATCH 2/7] refactor: change the pool_func.go file of retrieveWorker() and fix comments. --- pool.go | 8 +++---- pool_func.go | 60 +++++++++++++++++++++++++--------------------------- 2 files changed, 33 insertions(+), 35 deletions(-) diff --git a/pool.go b/pool.go index 742fc59d..77231e7c 100644 --- a/pool.go +++ b/pool.go @@ -341,21 +341,21 @@ func (p *Pool) retrieveWorker() (w worker) { p.lock.Lock() retry: + // First try to fetch the worker from the queue if w = p.workers.detach(); w != nil { - // first try to fetch the worker from the queue p.lock.Unlock() return } + // If the worker queue is empty and we don't run out of the pool capacity, + // then just spawn a new worker goroutine. if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { - // if the worker queue is empty and we don't run out of the pool capacity, - // then just spawn a new worker goroutine. p.lock.Unlock() spawnWorker() return } - // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool. + // Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool. if p.options.Nonblocking { p.lock.Unlock() return diff --git a/pool_func.go b/pool_func.go index 69d3c86c..71c74f9f 100644 --- a/pool_func.go +++ b/pool_func.go @@ -345,45 +345,43 @@ func (p *PoolWithFunc) retrieveWorker() (w worker) { } p.lock.Lock() - w = p.workers.detach() - if w != nil { // first try to fetch the worker from the queue + +retry: + // First try to fetch the worker from the queue + if w = p.workers.detach(); w != nil { p.lock.Unlock() - } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { - // if the worker queue is empty and we don't run out of the pool capacity, - // then just spawn a new worker goroutine. + return + } + + // If the worker queue is empty and we don't run out of the pool capacity, + // then just spawn a new worker goroutine. + if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { p.lock.Unlock() spawnWorker() - } else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool. - if p.options.Nonblocking { - p.lock.Unlock() - return - } - retry: - if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks { - p.lock.Unlock() - return - } + return + } + + // Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool. + if p.options.Nonblocking { + p.lock.Unlock() + return + } - p.addWaiting(1) - p.cond.Wait() // block and wait for an available worker - p.addWaiting(-1) + if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks { + p.lock.Unlock() + return + } - if p.IsClosed() { - p.lock.Unlock() - return - } + p.addWaiting(1) + p.cond.Wait() // block and wait for an available worker + p.addWaiting(-1) - if w = p.workers.detach(); w == nil { - if p.Free() > 0 { - p.lock.Unlock() - spawnWorker() - return - } - goto retry - } + if p.IsClosed() { p.lock.Unlock() + return } - return + + goto retry } // revertWorker puts a worker back into free pool, recycling the goroutines. From d9bfc031ca542aa6366247f81e5909f8a99ff3e5 Mon Sep 17 00:00:00 2001 From: POABOB Date: Sun, 17 Sep 2023 20:58:43 +0800 Subject: [PATCH 3/7] refactor: add a period on the sentence of comments. --- pool.go | 2 +- pool_func.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pool.go b/pool.go index 77231e7c..88ca2a52 100644 --- a/pool.go +++ b/pool.go @@ -341,7 +341,7 @@ func (p *Pool) retrieveWorker() (w worker) { p.lock.Lock() retry: - // First try to fetch the worker from the queue + // First try to fetch the worker from the queue. if w = p.workers.detach(); w != nil { p.lock.Unlock() return diff --git a/pool_func.go b/pool_func.go index 71c74f9f..1dcafcad 100644 --- a/pool_func.go +++ b/pool_func.go @@ -347,7 +347,7 @@ func (p *PoolWithFunc) retrieveWorker() (w worker) { p.lock.Lock() retry: - // First try to fetch the worker from the queue + // First try to fetch the worker from the queue. if w = p.workers.detach(); w != nil { p.lock.Unlock() return From 690300fd24588c0315e3f7c3b9d93e766ea8d1cd Mon Sep 17 00:00:00 2001 From: POABOB Date: Sun, 17 Sep 2023 21:21:18 +0800 Subject: [PATCH 4/7] refactor: combine two conditions into one. --- pool.go | 10 +++------- pool_func.go | 10 +++------- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/pool.go b/pool.go index 88ca2a52..5d4c0c24 100644 --- a/pool.go +++ b/pool.go @@ -355,17 +355,13 @@ retry: return } - // Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool. - if p.options.Nonblocking { - p.lock.Unlock() - return - } - - if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks { + // Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value. + if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) { p.lock.Unlock() return } + // Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool. p.addWaiting(1) p.cond.Wait() // block and wait for an available worker p.addWaiting(-1) diff --git a/pool_func.go b/pool_func.go index 1dcafcad..8b1abf6c 100644 --- a/pool_func.go +++ b/pool_func.go @@ -361,17 +361,13 @@ retry: return } - // Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool. - if p.options.Nonblocking { - p.lock.Unlock() - return - } - - if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks { + // Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value. + if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) { p.lock.Unlock() return } + // Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool. p.addWaiting(1) p.cond.Wait() // block and wait for an available worker p.addWaiting(-1) From a92f6c8421c873b6194d6b7df8aaf454c1beaa24 Mon Sep 17 00:00:00 2001 From: POABOB Date: Sun, 17 Sep 2023 21:53:27 +0800 Subject: [PATCH 5/7] refactor: eliminate the local function spawnWorker() and use the code directly below p.Cap(). --- pool.go | 8 ++------ pool_func.go | 8 ++------ 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/pool.go b/pool.go index 5d4c0c24..bdf34541 100644 --- a/pool.go +++ b/pool.go @@ -333,11 +333,6 @@ func (p *Pool) addWaiting(delta int) { // retrieveWorker returns an available worker to run the tasks. func (p *Pool) retrieveWorker() (w worker) { - spawnWorker := func() { - w = p.workerCache.Get().(*goWorker) - w.run() - } - p.lock.Lock() retry: @@ -351,7 +346,8 @@ retry: // then just spawn a new worker goroutine. if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { p.lock.Unlock() - spawnWorker() + w = p.workerCache.Get().(*goWorker) + w.run() return } diff --git a/pool_func.go b/pool_func.go index 8b1abf6c..e41ae0e5 100644 --- a/pool_func.go +++ b/pool_func.go @@ -339,11 +339,6 @@ func (p *PoolWithFunc) addWaiting(delta int) { // retrieveWorker returns an available worker to run the tasks. func (p *PoolWithFunc) retrieveWorker() (w worker) { - spawnWorker := func() { - w = p.workerCache.Get().(*goWorkerWithFunc) - w.run() - } - p.lock.Lock() retry: @@ -357,7 +352,8 @@ retry: // then just spawn a new worker goroutine. if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { p.lock.Unlock() - spawnWorker() + w = p.workerCache.Get().(*goWorkerWithFunc) + w.run() return } From e209aae7b522eb86ed1769204e3092dd095b2b48 Mon Sep 17 00:00:00 2001 From: POABOB Date: Wed, 18 Oct 2023 10:38:51 +0800 Subject: [PATCH 6/7] refactor: make the code cleaner & unify the calculation of mid in binary search. --- worker_loop_queue.go | 20 ++++++-------------- worker_stack.go | 2 +- 2 files changed, 7 insertions(+), 15 deletions(-) diff --git a/worker_loop_queue.go b/worker_loop_queue.go index 5db8bb72..80602ec8 100644 --- a/worker_loop_queue.go +++ b/worker_loop_queue.go @@ -19,15 +19,12 @@ func newWorkerLoopQueue(size int) *loopQueue { } func (wq *loopQueue) len() int { - if wq.size == 0 { + if wq.size == 0 || wq.isEmpty() { return 0 } - if wq.head == wq.tail { - if wq.isFull { - return wq.size - } - return 0 + if wq.head == wq.tail && wq.isFull { + return wq.size } if wq.tail > wq.head { @@ -50,11 +47,8 @@ func (wq *loopQueue) insert(w worker) error { return errQueueIsFull } wq.items[wq.tail] = w - wq.tail++ + wq.tail = (wq.tail + 1) % wq.size - if wq.tail == wq.size { - wq.tail = 0 - } if wq.tail == wq.head { wq.isFull = true } @@ -69,10 +63,8 @@ func (wq *loopQueue) detach() worker { w := wq.items[wq.head] wq.items[wq.head] = nil - wq.head++ - if wq.head == wq.size { - wq.head = 0 - } + wq.head = (wq.head + 1) % wq.size + wq.isFull = false return w diff --git a/worker_stack.go b/worker_stack.go index 0843dd44..6b01abcd 100644 --- a/worker_stack.go +++ b/worker_stack.go @@ -62,7 +62,7 @@ func (wq *workerStack) refresh(duration time.Duration) []worker { func (wq *workerStack) binarySearch(l, r int, expiryTime time.Time) int { for l <= r { - mid := int(uint(l+r) >> 1) // avoid overflow when computing mid + mid := l + ((r - l) >> 1) // avoid overflow when computing mid if expiryTime.Before(wq.items[mid].lastUsedTime()) { r = mid - 1 } else { From 9a263144292af5aebff9b73af5fcabdd6b301970 Mon Sep 17 00:00:00 2001 From: POABOB Date: Wed, 18 Oct 2023 14:45:05 +0800 Subject: [PATCH 7/7] refactor: add a comment on the variable mid in worker_loop_queue.go. --- worker_loop_queue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker_loop_queue.go b/worker_loop_queue.go index 80602ec8..a5451ab5 100644 --- a/worker_loop_queue.go +++ b/worker_loop_queue.go @@ -126,7 +126,7 @@ func (wq *loopQueue) binarySearch(expiryTime time.Time) int { basel = wq.head l := 0 for l <= r { - mid = l + ((r - l) >> 1) + mid = l + ((r - l) >> 1) // avoid overflow when computing mid // calculate true mid position from mapped mid position tmid = (mid + basel + nlen) % nlen if expiryTime.Before(wq.items[tmid].lastUsedTime()) {