Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Using atomic instead of mutex and delete scratch slice" #1846

Merged
merged 3 commits into from
Sep 10, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 91 additions & 74 deletions workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
)

Expand All @@ -22,57 +21,29 @@ type workerPool struct {

// Function for serving server connections.
// It must leave c unclosed.
ready workerChanStack
WorkerFunc ServeHandler

stopCh chan struct{}

connState func(net.Conn, ConnState)

ready []*workerChan

MaxWorkersCount int

MaxIdleWorkerDuration time.Duration

workersCount int32
workersCount int

mustStop atomic.Bool
lock sync.Mutex

LogAllErrors bool
mustStop bool
}

type workerChan struct {
next *workerChan

ch chan net.Conn

lastUseTime int64
}

type workerChanStack struct {
head atomic.Pointer[workerChan]
}

func (s *workerChanStack) push(ch *workerChan) {
for {
oldHead := s.head.Load()
ch.next = oldHead
if s.head.CompareAndSwap(oldHead, ch) {
break
}
}
}

func (s *workerChanStack) pop() *workerChan {
for {
oldHead := s.head.Load()
if oldHead == nil {
return nil
}

if s.head.CompareAndSwap(oldHead, oldHead.next) {
return oldHead
}
}
lastUseTime time.Time
ch chan net.Conn
}

func (wp *workerPool) Start() {
Expand All @@ -87,8 +58,9 @@ func (wp *workerPool) Start() {
}
}
go func() {
var scratch []*workerChan
for {
wp.clean()
wp.clean(&scratch)
select {
case <-stopCh:
return
Expand All @@ -109,15 +81,15 @@ func (wp *workerPool) Stop() {
// Stop all the workers waiting for incoming connections.
// Do not wait for busy workers - they will stop after
// serving the connection and noticing wp.mustStop = true.

for {
ch := wp.ready.pop()
if ch == nil {
break
}
ch.ch <- nil
wp.lock.Lock()
ready := wp.ready
for i := range ready {
ready[i].ch <- nil
ready[i] = nil
}
wp.mustStop.Store(true)
wp.ready = ready[:0]
wp.mustStop = true
wp.lock.Unlock()
}

func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration {
Expand All @@ -127,22 +99,50 @@ func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration {
return wp.MaxIdleWorkerDuration
}

func (wp *workerPool) clean() {
func (wp *workerPool) clean(scratch *[]*workerChan) {
maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration()
criticalTime := time.Now().Add(-maxIdleWorkerDuration).UnixNano()

for {
current := wp.ready.head.Load()
if current == nil || atomic.LoadInt64(&current.lastUseTime) >= criticalTime {
break
}
// Clean least recently used workers if they didn't serve connections
// for more than maxIdleWorkerDuration.
criticalTime := time.Now().Add(-maxIdleWorkerDuration)

wp.lock.Lock()
ready := wp.ready
n := len(ready)

next := current.next
if wp.ready.head.CompareAndSwap(current, next) {
current.ch <- nil
wp.workerChanPool.Put(current)
// Use binary-search algorithm to find out the index of the least recently worker which can be cleaned up.
l, r := 0, n-1
for l <= r {
mid := (l + r) / 2
if criticalTime.After(wp.ready[mid].lastUseTime) {
l = mid + 1
} else {
r = mid - 1
}
}
i := r
if i == -1 {
wp.lock.Unlock()
return
}

*scratch = append((*scratch)[:0], ready[:i+1]...)
m := copy(ready, ready[i+1:])
for i = m; i < n; i++ {
ready[i] = nil
}
wp.ready = ready[:m]
wp.lock.Unlock()

// Notify obsolete workers to stop.
// This notification must be outside the wp.lock, since ch.ch
// may be blocking and may consume a lot of time if many workers
// are located on non-local CPUs.
tmp := *scratch
for i := range tmp {
tmp[i].ch <- nil
tmp[i] = nil
}
}

func (wp *workerPool) Serve(c net.Conn) bool {
Expand All @@ -169,32 +169,47 @@ var workerChanCap = func() int {
}()

func (wp *workerPool) getCh() *workerChan {
for {
ch := wp.ready.pop()
if ch != nil {
return ch
var ch *workerChan
createWorker := false

wp.lock.Lock()
ready := wp.ready
n := len(ready) - 1
if n < 0 {
if wp.workersCount < wp.MaxWorkersCount {
createWorker = true
wp.workersCount++
}
} else {
ch = ready[n]
ready[n] = nil
wp.ready = ready[:n]
}
wp.lock.Unlock()

currentWorkers := atomic.LoadInt32(&wp.workersCount)
if int(currentWorkers) < wp.MaxWorkersCount {
if atomic.CompareAndSwapInt32(&wp.workersCount, currentWorkers, currentWorkers+1) {
ch = wp.workerChanPool.Get().(*workerChan)
go wp.workerFunc(ch)
return ch
}
} else {
break
if ch == nil {
if !createWorker {
return nil
}
vch := wp.workerChanPool.Get()
ch = vch.(*workerChan)
go func() {
wp.workerFunc(ch)
wp.workerChanPool.Put(vch)
}()
}
return nil
return ch
}

func (wp *workerPool) release(ch *workerChan) bool {
atomic.StoreInt64(&ch.lastUseTime, time.Now().UnixNano())
if wp.mustStop.Load() {
ch.lastUseTime = time.Now()
wp.lock.Lock()
if wp.mustStop {
wp.lock.Unlock()
return false
}
wp.ready.push(ch)
wp.ready = append(wp.ready, ch)
wp.lock.Unlock()
return true
}

Expand Down Expand Up @@ -230,5 +245,7 @@ func (wp *workerPool) workerFunc(ch *workerChan) {
}
}

atomic.AddInt32(&wp.workersCount, -1)
wp.lock.Lock()
wp.workersCount--
wp.lock.Unlock()
}