Skip to content

Commit

Permalink
Merge pull request #603 from matheusd/bufferpool
Browse files Browse the repository at this point in the history
bufferpool: Improve performance
  • Loading branch information
lthibault authored Jan 11, 2025
2 parents f9295d9 + 1c35ade commit c559d94
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 95 deletions.
180 changes: 106 additions & 74 deletions exp/bufferpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
package bufferpool

import (
"sync"
"math/bits"

"github.com/colega/zeropool"
)
Expand All @@ -13,35 +13,21 @@ const (
)

// A default global pool.
var Default Pool

// Pool maintains a list of BucketCount buckets that contain buffers
// of exponentially-increasing capacity, 1 << 0 to 1 << BucketCount.
//
// The MinAlloc field specifies the minimum capacity of new buffers
// allocated by Pool, which improves reuse of small buffers. For the
// avoidance of doubt: calls to Get() with size < MinAlloc return a
// buffer of len(buf) = size and cap(buf) >= MinAlloc. MinAlloc MUST
// NOT exceed 1 << BucketCount, or method calls to Pool will panic.
//
// The zero-value Pool is ready to use, defaulting to BucketCount=20
// and MinAlloc=1024 (max size = ~1MiB). Most applications will not
// benefit from tuning these parameters.
// This pool defaults to bucketCount=20 and minAlloc=1024 (max size = ~1MiB).
var Default Pool = *NewPool(defaultMinSize, defaultBucketCount)

// Pool maintains a list of buffers, exponentially increasing in size. Values
// MUST be initialized by NewPool().
//
// As a general rule, increasing MinAlloc reduces GC latency at the
// expense of increased memory usage. Increasing BucketCount can
// reduce GC latency in applications that frequently allocate large
// buffers.
// Buffer instances are safe for concurrent access.
type Pool struct {
once sync.Once
MinAlloc, BucketCount int
buckets bucketSlice
minAlloc int
buckets bucketSlice
}

// Get a buffer of len(buf) == size and cap >= size.
func (p *Pool) Get(size int) []byte {
p.init()

if buf := p.buckets.Get(size); buf != nil {
return buf[:size]
}
Expand All @@ -52,76 +38,122 @@ func (p *Pool) Get(size int) []byte {
// Put returns the buffer to the pool. The first len(buf) bytes
// of the buffer are zeroed.
func (p *Pool) Put(buf []byte) {
p.init()

for i := range buf {
buf[i] = 0
}

// Do not store buffers less than the min alloc size (prevents storing
// buffers that do not conform to the min alloc policy of this pool).
if cap(buf) < p.minAlloc {
return
}

p.buckets.Put(buf[:cap(buf)])
}

func (p *Pool) init() {
p.once.Do(func() {
if p.MinAlloc <= 0 {
p.MinAlloc = defaultMinSize
}

if p.BucketCount <= 0 {
p.BucketCount = defaultBucketCount
}

if p.MinAlloc > (1 << p.BucketCount) {
panic("MinAlloc greater than largest bucket")
}

// Get the index of the bucket responsible for MinAlloc.
var idx int
for idx = range p.buckets {
if 1<<idx >= p.MinAlloc {
break
}
}

p.buckets = make(bucketSlice, p.BucketCount)
for i := range p.buckets {
if i < idx {
// Set the 'New' function for all "small" buckets to
// n.buckets[idx].Get, so as to allow reuse of buffers
// smaller than MinAlloc that are passed to Put, while
// still maximizing reuse of buffers allocated by Get.
// Note that we cannot simply use n.buckets[idx].New,
// as this would side-step pooling.
p.buckets[i] = zeropool.New(p.buckets[idx].Get)
} else {
p.buckets[i] = zeropool.New(newAllocFunc(i))
}
}
})
// NewPool creates a list of BucketCount buckets that contain buffers
// of exponentially-increasing capacity, 1 << 0 to 1 << BucketCount.
//
// The minAlloc field specifies the minimum capacity of new buffers
// allocated by Pool, which improves reuse of small buffers. For the
// avoidance of doubt: calls to Get() with size < minAlloc return a
// buffer of len(buf) = size and cap(buf) >= minAlloc. MinAlloc MUST
// NOT exceed 1 << BucketCount, or method calls to Pool will panic.
//
// Passing zero to the parameters will default bucketCount to 20
// and minAlloc to 1024 (max size = ~1MiB).
//
// As a general rule, increasing MinAlloc reduces GC latency at the
// expense of increased memory usage. Increasing BucketCount can
// reduce GC latency in applications that frequently allocate large
// buffers.
func NewPool(minAlloc, bucketCount int) *Pool {
if minAlloc <= 0 {
minAlloc = defaultMinSize
}

if bucketCount <= 0 {
bucketCount = defaultBucketCount
}

if minAlloc > (1 << bucketCount) {
panic("MinAlloc greater than largest bucket")
}

if !isPowerOf2(minAlloc) {
panic("MinAlloc not a power of two")
}

return &Pool{
minAlloc: minAlloc,
buckets: makeBucketSlice(minAlloc, bucketCount),
}
}

type bucketSlice []*zeropool.Pool[[]byte]

func isPowerOf2(i int) bool {
return i&(i-1) == 0
}

func bucketToGet(size int) int {
i := bits.Len(uint(size))
if isPowerOf2(size) && size > 0 {
// When the size is a power of two, reduce by one (because
// bucket i is for sizes <= 1<< i).
i -= 1
}
return i
}

type bucketSlice []zeropool.Pool[[]byte]
func bucketToPut(size int) int {
i := bits.Len(uint(size))

// Always put on the bucket whose upper bound is size == 1<<i.
i -= 1
return i
}

func (bs bucketSlice) Get(size int) []byte {
for i := range bs {
if 1<<i >= size {
return bs[i].Get()
}
i := bucketToGet(size)
if i < len(bs) {
r := bs[i].Get()
return r
}

return nil
}

func (bs bucketSlice) Put(buf []byte) {
for i := range bs {
if cap(buf) >= 1<<i && cap(buf) < 1<<(i+1) {
bs[i].Put(buf)
break
}
i := bucketToPut(cap(buf))
if i < len(bs) {
bs[i].Put(buf)
}
}

func newAllocFunc(i int) func() []byte {
// makeBucketSlice creates a new bucketSlice with the given parameters. These
// are NOT validated.
func makeBucketSlice(minAlloc, bucketCount int) bucketSlice {
// Create all buckets that are >= the bucket that stores the min
// allocation size.
minBucket := bucketToGet(minAlloc)
buckets := make(bucketSlice, bucketCount)
for i := minBucket; i < bucketCount; i++ {
bp := zeropool.New(newAllocFuncForBucket(i))
buckets[i] = &bp
}

// Buckets smaller than the min bucket size all get/put buffers in the
// minimum bucket size.
for i := 0; i < minBucket; i++ {
buckets[i] = buckets[minBucket]
}

return buckets
}

// newAllocFuncForBucket returns a function to allocate a byte slice of size
// 2^i.
func newAllocFuncForBucket(i int) func() []byte {
return func() []byte {
return make([]byte, 1<<i)
}
Expand Down
96 changes: 96 additions & 0 deletions exp/bufferpool/pool_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package bufferpool

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
)

func TestBucketIndex(t *testing.T) {
tests := []struct {
size int
get int
put int
}{
// Only sizes that are powers of two are obtained and returned
// to the same bucket.
//
// Sizes that are not a power of two must be fetched by the next
// higher power of two, but are returned to the lower one.
{size: 0, get: 0, put: -1},
{size: 1, get: 0, put: 0},
{size: 26, get: 5, put: 4}, // 26 == 0b00011010
{size: 32, get: 5, put: 5},
{size: 1024, get: 10, put: 10},
{size: 1025, get: 11, put: 10},
}

for i := range tests {
tc := tests[i]
t.Run(fmt.Sprintf("%d", tc.size), func(t *testing.T) {
get := bucketToGet(tc.size)
require.Equal(t, tc.get, get)
put := bucketToPut(tc.size)
require.Equal(t, tc.put, put)
})
}
}

func TestBucketSlice(t *testing.T) {
const minAlloc = 8
const bucketCount = 10
const sizeLastBucket = 1 << (bucketCount - 1)

tests := []struct {
size int
wantLen int
wantCap int
}{{
size: -1, // Negative values are skipped.
wantLen: 0,
wantCap: 0,
}, {
size: 0,
wantLen: minAlloc,
wantCap: minAlloc,
}, {
size: 1,
wantLen: minAlloc,
wantCap: minAlloc,
}, {
size: minAlloc,
wantLen: minAlloc,
wantCap: minAlloc,
}, {
size: minAlloc + 1, // Goes to next bucket.
wantLen: minAlloc * 2,
wantCap: minAlloc * 2,
}, {
size: minAlloc*2 + 1,
wantLen: minAlloc * 4,
wantCap: minAlloc * 4,
}, {
size: sizeLastBucket - 1,
wantLen: sizeLastBucket,
wantCap: sizeLastBucket,
}, {
size: sizeLastBucket,
wantLen: sizeLastBucket,
wantCap: sizeLastBucket,
}, {
size: sizeLastBucket + 1, // Anything > last bucket size is not allocated.
wantLen: 0,
wantCap: 0,
}}

for _, tc := range tests {
t.Run(fmt.Sprintf("%d", tc.size), func(t *testing.T) {
bs := makeBucketSlice(minAlloc, bucketCount)
require.Len(t, bs, bucketCount)
buf := bs.Get(tc.size)
require.Len(t, buf, tc.wantLen)
require.Equal(t, tc.wantCap, cap(buf))
})
}
}
Loading

0 comments on commit c559d94

Please sign in to comment.