Skip to content

Commit

Permalink
(2.11) Internal: Small udpates to ipQueue (#5940)
Browse files Browse the repository at this point in the history
Small updates to ipQueue:
- Document that caller must not reuse the slice after call to recycle().
- Reset size to 0 on pop() or last popOne() without need to call
q.caclc().
- Added missing recycle() calls in some places.
- Added benchmarks to check perf impact on future changes.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
derekcollison authored Sep 30, 2024
2 parents a97041b + fb4bc59 commit 1326e6c
Show file tree
Hide file tree
Showing 4 changed files with 269 additions and 87 deletions.
87 changes: 38 additions & 49 deletions server/ipqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,15 @@ var errIPQSizeLimitReached = errors.New("IPQ size limit reached")

func newIPQueue[T any](s *Server, name string, opts ...ipQueueOpt[T]) *ipQueue[T] {
q := &ipQueue[T]{
ch: make(chan struct{}, 1),
pool: &sync.Pool{},
ch: make(chan struct{}, 1),
pool: &sync.Pool{
New: func() any {
// Reason we use pointer to slice instead of slice is explained
// here: https://staticcheck.io/docs/checks#SA6002
res := make([]T, 0, 32)
return &res
},
},
name: name,
m: &s.ipQueues,
ipQueueOpts: ipQueueOpts[T]{
Expand All @@ -104,25 +111,12 @@ func newIPQueue[T any](s *Server, name string, opts ...ipQueueOpt[T]) *ipQueue[T
// entry is the first to be added, and returns the length of the queue after
// this element is added.
func (q *ipQueue[T]) push(e T) (int, error) {
var signal bool
q.Lock()
l := len(q.elts) - q.pos
if q.mlen > 0 && l == q.mlen {
q.Unlock()
return l, errIPQLenLimitReached
}
if l == 0 {
signal = true
eltsi := q.pool.Get()
if eltsi != nil {
// Reason we use pointer to slice instead of slice is explained
// here: https://staticcheck.io/docs/checks#SA6002
q.elts = (*(eltsi.(*[]T)))[:0]
}
if cap(q.elts) == 0 {
q.elts = make([]T, 0, 32)
}
}
if q.calc != nil {
sz := q.calc(e)
if q.msz > 0 && q.sz+sz > q.msz {
Expand All @@ -131,9 +125,13 @@ func (q *ipQueue[T]) push(e T) (int, error) {
}
q.sz += sz
}
if q.elts == nil {
// What comes out of the pool is already of size 0, so no need for [:0].
q.elts = *(q.pool.Get().(*[]T))
}
q.elts = append(q.elts, e)
q.Unlock()
if signal {
if l == 0 {
select {
case q.ch <- struct{}{}:
default:
Expand All @@ -155,29 +153,23 @@ func (q *ipQueue[T]) pop() []T {
if q == nil {
return nil
}
var elts []T
q.Lock()
if len(q.elts)-q.pos == 0 {
q.Unlock()
return nil
}
var elts []T
if q.pos == 0 {
elts = q.elts
} else {
elts = q.elts[q.pos:]
}
q.elts, q.pos = nil, 0
q.elts, q.pos, q.sz = nil, 0, 0
atomic.AddInt64(&q.inprogress, int64(len(elts)))
if q.calc != nil {
for _, e := range elts {
q.sz -= q.calc(e)
}
}
q.Unlock()
return elts
}

func (q *ipQueue[T]) resetAndReturnToPool(elts *[]T) {
(*elts) = (*elts)[:0]
q.pool.Put(elts)
}

// Returns the first element from the queue, if any. See comment above
// regarding calling after being notified that there is something and
// the use of drain(). In short, the caller should always check the
Expand All @@ -186,27 +178,30 @@ func (q *ipQueue[T]) resetAndReturnToPool(elts *[]T) {
func (q *ipQueue[T]) popOne() (T, bool) {
q.Lock()
l := len(q.elts) - q.pos
if l < 1 {
if l == 0 {
q.Unlock()
var empty T
return empty, false
}
e := q.elts[q.pos]
q.pos++
if q.calc != nil {
q.sz -= q.calc(e)
}
l--
if l > 0 {
if l--; l > 0 {
q.pos++
if q.calc != nil {
q.sz -= q.calc(e)
}
// We need to re-signal
select {
case q.ch <- struct{}{}:
default:
}
} else {
// We have just emptied the queue, so we can recycle now.
q.resetAndReturnToPool(&q.elts)
q.elts, q.pos = nil, 0
// We have just emptied the queue, so we can reuse unless it is too big.
if cap(q.elts) <= q.mrs {
q.elts = q.elts[:0]
} else {
q.elts = nil
}
q.pos, q.sz = 0, 0
}
q.Unlock()
return e, true
Expand All @@ -216,25 +211,23 @@ func (q *ipQueue[T]) popOne() (T, bool) {
// a first element is added to the queue.
// This will also decrement the "in progress" count with the length
// of the slice.
// Reason we use pointer to slice instead of slice is explained
// here: https://staticcheck.io/docs/checks#SA6002
// WARNING: The caller MUST never reuse `elts`.
func (q *ipQueue[T]) recycle(elts *[]T) {
// If invoked with a nil list, nothing to do.
if elts == nil || *elts == nil {
return
}
// Update the in progress count.
if len(*elts) > 0 {
if atomic.AddInt64(&q.inprogress, int64(-(len(*elts)))) < 0 {
atomic.StoreInt64(&q.inprogress, 0)
}
atomic.AddInt64(&q.inprogress, int64(-(len(*elts))))
}
// We also don't want to recycle huge slices, so check against the max.
// q.mrs is normally immutable but can be changed, in a safe way, in some tests.
if cap(*elts) > q.mrs {
return
}
q.resetAndReturnToPool(elts)
(*elts) = (*elts)[:0]
q.pool.Put(elts)
}

// Returns the current length of the queue.
Expand All @@ -261,11 +254,7 @@ func (q *ipQueue[T]) drain() {
return
}
q.Lock()
if q.elts != nil {
q.resetAndReturnToPool(&q.elts)
q.elts, q.pos = nil, 0
}
q.sz = 0
q.elts, q.pos, q.sz = nil, 0, 0
// Consume the signal if it was present to reduce the chance of a reader
// routine to be think that there is something in the queue...
select {
Expand Down
Loading

0 comments on commit 1326e6c

Please sign in to comment.