From 7aaa4349f58c6990c283dbaaca37622bf9051871 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Sun, 20 Oct 2019 18:22:13 +0800 Subject: [PATCH] Optimize loop queue of workers --- pool.go | 4 +- worker_array.go | 12 ++--- worker_loop_queue.go | 96 +++++++++++++++++++++++---------------- worker_loop_queue_test.go | 2 +- worker_stack.go | 15 ++---- worker_stack_test.go | 14 +++--- 6 files changed, 79 insertions(+), 64 deletions(-) diff --git a/pool.go b/pool.go index f97e1d7e..5e13c727 100644 --- a/pool.go +++ b/pool.go @@ -73,7 +73,7 @@ func (p *Pool) periodicallyPurge() { } p.lock.Lock() - expiredWorkers := p.workers.findOutExpiry(p.options.ExpiryDuration) + expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration) p.lock.Unlock() // Notify obsolete workers to stop. @@ -180,7 +180,7 @@ func (p *Pool) Release() { p.once.Do(func() { atomic.StoreInt32(&p.release, 1) p.lock.Lock() - p.workers.release() + p.workers.reset() p.lock.Unlock() }) } diff --git a/worker_array.go b/worker_array.go index 053ea329..a45f89aa 100644 --- a/worker_array.go +++ b/worker_array.go @@ -6,11 +6,11 @@ import ( ) var ( - // ErrQueueIsFull will be returned when the worker queue is full. - ErrQueueIsFull = errors.New("the queue is full") + // errQueueIsFull will be returned when the worker queue is full. + errQueueIsFull = errors.New("the queue is full") - // ErrQueueLengthIsZero will be returned when trying to insert item to a released worker queue. - ErrQueueLengthIsZero = errors.New("the queue length is zero") + // errQueueIsReleased will be returned when trying to insert item to a released worker queue. + errQueueIsReleased = errors.New("the queue length is zero") ) type workerArray interface { @@ -18,8 +18,8 @@ type workerArray interface { isEmpty() bool insert(worker *goWorker) error detach() *goWorker - findOutExpiry(duration time.Duration) []*goWorker - release() + retrieveExpiry(duration time.Duration) []*goWorker + reset() } type arrayType int diff --git a/worker_loop_queue.go b/worker_loop_queue.go index 625861dd..3c15e556 100644 --- a/worker_loop_queue.go +++ b/worker_loop_queue.go @@ -3,95 +3,115 @@ package ants import "time" type loopQueue struct { - items []*goWorker - expiry []*goWorker - head int - tail int - remainder int + items []*goWorker + expiry []*goWorker + head int + tail int + size int + isFull bool } func newWorkerLoopQueue(size int) *loopQueue { - if size <= 0 { - return nil - } - - wq := loopQueue{ - items: make([]*goWorker, size+1), - head: 0, - tail: 0, - remainder: size + 1, + return &loopQueue{ + items: make([]*goWorker, size), + size: size, } - - return &wq } func (wq *loopQueue) len() int { - if wq.remainder == 0 { + if wq.size == 0 { return 0 } - return (wq.tail - wq.head + wq.remainder) % wq.remainder + if wq.head == wq.tail { + if wq.isFull { + return wq.size + } + return 0 + } + + if wq.tail > wq.head { + return wq.tail - wq.head + } + + return wq.size - wq.head + wq.tail } func (wq *loopQueue) isEmpty() bool { - return wq.tail == wq.head + return wq.head == wq.tail && !wq.isFull } func (wq *loopQueue) insert(worker *goWorker) error { - if wq.remainder == 0 { - return ErrQueueLengthIsZero - } - next := (wq.tail + 1) % wq.remainder - if next == wq.head { - return ErrQueueIsFull + if wq.size == 0 { + return errQueueIsReleased } + if wq.isFull { + return errQueueIsFull + } wq.items[wq.tail] = worker - wq.tail = next + wq.tail++ + + if wq.tail == wq.size { + wq.tail = 0 + } + if wq.tail == wq.head { + wq.isFull = true + } return nil } func (wq *loopQueue) detach() *goWorker { - if wq.len() == 0 { + if wq.isEmpty() { return nil } w := wq.items[wq.head] - wq.head = (wq.head + 1) % wq.remainder + wq.head++ + if wq.head == wq.size { + wq.head = 0 + } + wq.isFull = false return w } -func (wq *loopQueue) findOutExpiry(duration time.Duration) []*goWorker { - if wq.len() == 0 { +func (wq *loopQueue) retrieveExpiry(duration time.Duration) []*goWorker { + if wq.isEmpty() { return nil } wq.expiry = wq.expiry[:0] expiryTime := time.Now().Add(-duration) - for wq.head != wq.tail { + for !wq.isEmpty() { if expiryTime.Before(wq.items[wq.head].recycleTime) { break } wq.expiry = append(wq.expiry, wq.items[wq.head]) - wq.head = (wq.head + 1) % wq.remainder + wq.head++ + if wq.head == wq.size { + wq.head = 0 + } + wq.isFull = false } + return wq.expiry } -func (wq *loopQueue) release() { - if wq.len() == 0 { +func (wq *loopQueue) reset() { + if wq.isEmpty() { return } - for wq.head != wq.tail { - wq.items[wq.head].task <- nil - wq.head = (wq.head + 1) % wq.remainder +Releasing: + if w := wq.detach(); w != nil { + w.task <- nil + goto Releasing } wq.items = wq.items[:0] - wq.remainder = 0 + wq.size = 0 wq.head = 0 wq.tail = 0 } diff --git a/worker_loop_queue_test.go b/worker_loop_queue_test.go index db626586..573bff88 100644 --- a/worker_loop_queue_test.go +++ b/worker_loop_queue_test.go @@ -61,7 +61,7 @@ func TestLoopQueue(t *testing.T) { t.Fatalf("Enqueue error") } - q.findOutExpiry(time.Second) + q.retrieveExpiry(time.Second) if q.len() != 6 { t.Fatalf("Len error: %d", q.len()) diff --git a/worker_stack.go b/worker_stack.go index bdf710ea..6d74b434 100644 --- a/worker_stack.go +++ b/worker_stack.go @@ -9,15 +9,10 @@ type workerStack struct { } func newWorkerStack(size int) *workerStack { - if size < 0 { - return nil - } - - wq := workerStack{ + return &workerStack{ items: make([]*goWorker, 0, size), size: size, } - return &wq } func (wq *workerStack) len() int { @@ -45,14 +40,14 @@ func (wq *workerStack) detach() *goWorker { return w } -func (wq *workerStack) findOutExpiry(duration time.Duration) []*goWorker { +func (wq *workerStack) retrieveExpiry(duration time.Duration) []*goWorker { n := wq.len() if n == 0 { return nil } expiryTime := time.Now().Add(-duration) - index := wq.search(0, n-1, expiryTime) + index := wq.binarySearch(0, n-1, expiryTime) wq.expiry = wq.expiry[:0] if index != -1 { @@ -63,7 +58,7 @@ func (wq *workerStack) findOutExpiry(duration time.Duration) []*goWorker { return wq.expiry } -func (wq *workerStack) search(l, r int, expiryTime time.Time) int { +func (wq *workerStack) binarySearch(l, r int, expiryTime time.Time) int { var mid int for l <= r { mid = (l + r) / 2 @@ -76,7 +71,7 @@ func (wq *workerStack) search(l, r int, expiryTime time.Time) int { return r } -func (wq *workerStack) release() { +func (wq *workerStack) reset() { for i := 0; i < wq.len(); i++ { wq.items[i].task <- nil } diff --git a/worker_stack_test.go b/worker_stack_test.go index c9905cc8..5a55663b 100644 --- a/worker_stack_test.go +++ b/worker_stack_test.go @@ -54,7 +54,7 @@ func TestWorkerStack(t *testing.T) { t.Fatalf("Len error") } - q.findOutExpiry(time.Second) + q.retrieveExpiry(time.Second) if q.len() != 6 { t.Fatalf("Len error") @@ -69,12 +69,12 @@ func TestSearch(t *testing.T) { _ = q.insert(&goWorker{recycleTime: time.Now()}) - index := q.search(0, q.len()-1, time.Now()) + index := q.binarySearch(0, q.len()-1, time.Now()) if index != 0 { t.Fatalf("should is 0") } - index = q.search(0, q.len()-1, expiry1) + index = q.binarySearch(0, q.len()-1, expiry1) if index != -1 { t.Fatalf("should is -1") } @@ -83,17 +83,17 @@ func TestSearch(t *testing.T) { expiry2 := time.Now() _ = q.insert(&goWorker{recycleTime: time.Now()}) - index = q.search(0, q.len()-1, expiry1) + index = q.binarySearch(0, q.len()-1, expiry1) if index != -1 { t.Fatalf("should is -1") } - index = q.search(0, q.len()-1, expiry2) + index = q.binarySearch(0, q.len()-1, expiry2) if index != 0 { t.Fatalf("should is 0") } - index = q.search(0, q.len()-1, time.Now()) + index = q.binarySearch(0, q.len()-1, time.Now()) if index != 1 { t.Fatalf("should is 1") } @@ -111,7 +111,7 @@ func TestSearch(t *testing.T) { _ = q.insert(&goWorker{recycleTime: time.Now()}) } - index = q.search(0, q.len()-1, expiry3) + index = q.binarySearch(0, q.len()-1, expiry3) if index != 7 { t.Fatalf("should is 7") }