diff --git a/workerpool.go b/workerpool.go index 9ecd9481df..bc3a3c5bb4 100644 --- a/workerpool.go +++ b/workerpool.go @@ -6,6 +6,7 @@ import ( "runtime" "strings" "sync" + "sync/atomic" "time" ) @@ -21,29 +22,57 @@ type workerPool struct { // Function for serving server connections. // It must leave c unclosed. + ready workerChanStack WorkerFunc ServeHandler stopCh chan struct{} connState func(net.Conn, ConnState) - ready []*workerChan - MaxWorkersCount int MaxIdleWorkerDuration time.Duration - workersCount int + workersCount int32 - lock sync.Mutex + mustStop atomic.Bool LogAllErrors bool - mustStop bool } type workerChan struct { - lastUseTime time.Time - ch chan net.Conn + next *workerChan + + ch chan net.Conn + + lastUseTime int64 +} + +type workerChanStack struct { + head atomic.Pointer[workerChan] +} + +func (s *workerChanStack) push(ch *workerChan) { + for { + oldHead := s.head.Load() + ch.next = oldHead + if s.head.CompareAndSwap(oldHead, ch) { + break + } + } +} + +func (s *workerChanStack) pop() *workerChan { + for { + oldHead := s.head.Load() + if oldHead == nil { + return nil + } + + if s.head.CompareAndSwap(oldHead, oldHead.next) { + return oldHead + } + } } func (wp *workerPool) Start() { @@ -58,9 +87,8 @@ func (wp *workerPool) Start() { } } go func() { - var scratch []*workerChan for { - wp.clean(&scratch) + wp.clean() select { case <-stopCh: return @@ -81,15 +109,15 @@ func (wp *workerPool) Stop() { // Stop all the workers waiting for incoming connections. // Do not wait for busy workers - they will stop after // serving the connection and noticing wp.mustStop = true. - wp.lock.Lock() - ready := wp.ready - for i := range ready { - ready[i].ch <- nil - ready[i] = nil + + for { + ch := wp.ready.pop() + if ch == nil { + break + } + ch.ch <- nil } - wp.ready = ready[:0] - wp.mustStop = true - wp.lock.Unlock() + wp.mustStop.Store(true) } func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration { @@ -99,49 +127,21 @@ func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration { return wp.MaxIdleWorkerDuration } -func (wp *workerPool) clean(scratch *[]*workerChan) { +func (wp *workerPool) clean() { maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration() + criticalTime := time.Now().Add(-maxIdleWorkerDuration).UnixNano() - // Clean least recently used workers if they didn't serve connections - // for more than maxIdleWorkerDuration. - criticalTime := time.Now().Add(-maxIdleWorkerDuration) - - wp.lock.Lock() - ready := wp.ready - n := len(ready) - - // Use binary-search algorithm to find out the index of the least recently worker which can be cleaned up. - l, r := 0, n-1 - for l <= r { - mid := (l + r) / 2 - if criticalTime.After(wp.ready[mid].lastUseTime) { - l = mid + 1 - } else { - r = mid - 1 + for { + current := wp.ready.head.Load() + if current == nil || atomic.LoadInt64(¤t.lastUseTime) >= criticalTime { + break } - } - i := r - if i == -1 { - wp.lock.Unlock() - return - } - *scratch = append((*scratch)[:0], ready[:i+1]...) - m := copy(ready, ready[i+1:]) - for i = m; i < n; i++ { - ready[i] = nil - } - wp.ready = ready[:m] - wp.lock.Unlock() - - // Notify obsolete workers to stop. - // This notification must be outside the wp.lock, since ch.ch - // may be blocking and may consume a lot of time if many workers - // are located on non-local CPUs. - tmp := *scratch - for i := range tmp { - tmp[i].ch <- nil - tmp[i] = nil + next := current.next + if wp.ready.head.CompareAndSwap(current, next) { + current.ch <- nil + wp.workerChanPool.Put(current) + } } } @@ -169,47 +169,32 @@ var workerChanCap = func() int { }() func (wp *workerPool) getCh() *workerChan { - var ch *workerChan - createWorker := false - - wp.lock.Lock() - ready := wp.ready - n := len(ready) - 1 - if n < 0 { - if wp.workersCount < wp.MaxWorkersCount { - createWorker = true - wp.workersCount++ + for { + ch := wp.ready.pop() + if ch != nil { + return ch } - } else { - ch = ready[n] - ready[n] = nil - wp.ready = ready[:n] - } - wp.lock.Unlock() - if ch == nil { - if !createWorker { - return nil + currentWorkers := atomic.LoadInt32(&wp.workersCount) + if currentWorkers < int32(wp.MaxWorkersCount) { + if atomic.CompareAndSwapInt32(&wp.workersCount, currentWorkers, currentWorkers+1) { + ch = wp.workerChanPool.Get().(*workerChan) + go wp.workerFunc(ch) + return ch + } + } else { + break } - vch := wp.workerChanPool.Get() - ch = vch.(*workerChan) - go func() { - wp.workerFunc(ch) - wp.workerChanPool.Put(vch) - }() } - return ch + return nil } func (wp *workerPool) release(ch *workerChan) bool { - ch.lastUseTime = time.Now() - wp.lock.Lock() - if wp.mustStop { - wp.lock.Unlock() + atomic.StoreInt64(&ch.lastUseTime, time.Now().UnixNano()) + if wp.mustStop.Load() { return false } - wp.ready = append(wp.ready, ch) - wp.lock.Unlock() + wp.ready.push(ch) return true } @@ -245,7 +230,5 @@ func (wp *workerPool) workerFunc(ch *workerChan) { } } - wp.lock.Lock() - wp.workersCount-- - wp.lock.Unlock() + atomic.AddInt32(&wp.workersCount, -1) }