Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bufferpool: Improve performance #603

Merged
merged 1 commit into from
Jan 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 106 additions & 74 deletions exp/bufferpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
package bufferpool

import (
"sync"
"math/bits"

"github.com/colega/zeropool"
)
Expand All @@ -13,35 +13,21 @@ const (
)

// A default global pool.
var Default Pool

// Pool maintains a list of BucketCount buckets that contain buffers
// of exponentially-increasing capacity, 1 << 0 to 1 << BucketCount.
//
// The MinAlloc field specifies the minimum capacity of new buffers
// allocated by Pool, which improves reuse of small buffers. For the
// avoidance of doubt: calls to Get() with size < MinAlloc return a
// buffer of len(buf) = size and cap(buf) >= MinAlloc. MinAlloc MUST
// NOT exceed 1 << BucketCount, or method calls to Pool will panic.
//
// The zero-value Pool is ready to use, defaulting to BucketCount=20
// and MinAlloc=1024 (max size = ~1MiB). Most applications will not
// benefit from tuning these parameters.
// This pool defaults to bucketCount=20 and minAlloc=1024 (max size = ~1MiB).
var Default Pool = *NewPool(defaultMinSize, defaultBucketCount)

// Pool maintains a list of buffers, exponentially increasing in size. Values
// MUST be initialized by NewPool().
//
// As a general rule, increasing MinAlloc reduces GC latency at the
// expense of increased memory usage. Increasing BucketCount can
// reduce GC latency in applications that frequently allocate large
// buffers.
// Buffer instances are safe for concurrent access.
type Pool struct {
once sync.Once
MinAlloc, BucketCount int
buckets bucketSlice
minAlloc int
buckets bucketSlice
}

// Get a buffer of len(buf) == size and cap >= size.
func (p *Pool) Get(size int) []byte {
p.init()

if buf := p.buckets.Get(size); buf != nil {
return buf[:size]
}
Expand All @@ -52,76 +38,122 @@ func (p *Pool) Get(size int) []byte {
// Put returns the buffer to the pool. The first len(buf) bytes
// of the buffer are zeroed.
func (p *Pool) Put(buf []byte) {
p.init()

for i := range buf {
buf[i] = 0
}

// Do not store buffers less than the min alloc size (prevents storing
// buffers that do not conform to the min alloc policy of this pool).
if cap(buf) < p.minAlloc {
return
}

p.buckets.Put(buf[:cap(buf)])
}

func (p *Pool) init() {
p.once.Do(func() {
if p.MinAlloc <= 0 {
p.MinAlloc = defaultMinSize
}

if p.BucketCount <= 0 {
p.BucketCount = defaultBucketCount
}

if p.MinAlloc > (1 << p.BucketCount) {
panic("MinAlloc greater than largest bucket")
}

// Get the index of the bucket responsible for MinAlloc.
var idx int
for idx = range p.buckets {
if 1<<idx >= p.MinAlloc {
break
}
}

p.buckets = make(bucketSlice, p.BucketCount)
for i := range p.buckets {
if i < idx {
// Set the 'New' function for all "small" buckets to
// n.buckets[idx].Get, so as to allow reuse of buffers
// smaller than MinAlloc that are passed to Put, while
// still maximizing reuse of buffers allocated by Get.
// Note that we cannot simply use n.buckets[idx].New,
// as this would side-step pooling.
p.buckets[i] = zeropool.New(p.buckets[idx].Get)
} else {
p.buckets[i] = zeropool.New(newAllocFunc(i))
}
}
})
// NewPool creates a list of BucketCount buckets that contain buffers
// of exponentially-increasing capacity, 1 << 0 to 1 << BucketCount.
//
// The minAlloc field specifies the minimum capacity of new buffers
// allocated by Pool, which improves reuse of small buffers. For the
// avoidance of doubt: calls to Get() with size < minAlloc return a
// buffer of len(buf) = size and cap(buf) >= minAlloc. MinAlloc MUST
// NOT exceed 1 << BucketCount, or method calls to Pool will panic.
//
// Passing zero to the parameters will default bucketCount to 20
// and minAlloc to 1024 (max size = ~1MiB).
//
// As a general rule, increasing MinAlloc reduces GC latency at the
// expense of increased memory usage. Increasing BucketCount can
// reduce GC latency in applications that frequently allocate large
// buffers.
func NewPool(minAlloc, bucketCount int) *Pool {
if minAlloc <= 0 {
minAlloc = defaultMinSize
}

if bucketCount <= 0 {
bucketCount = defaultBucketCount
}

if minAlloc > (1 << bucketCount) {
panic("MinAlloc greater than largest bucket")
}

if !isPowerOf2(minAlloc) {
panic("MinAlloc not a power of two")
}

return &Pool{
minAlloc: minAlloc,
buckets: makeBucketSlice(minAlloc, bucketCount),
}
}

type bucketSlice []*zeropool.Pool[[]byte]

func isPowerOf2(i int) bool {
return i&(i-1) == 0
}

func bucketToGet(size int) int {
i := bits.Len(uint(size))
if isPowerOf2(size) && size > 0 {
// When the size is a power of two, reduce by one (because
// bucket i is for sizes <= 1<< i).
i -= 1
}
return i
}

type bucketSlice []zeropool.Pool[[]byte]
func bucketToPut(size int) int {
i := bits.Len(uint(size))

// Always put on the bucket whose upper bound is size == 1<<i.
i -= 1
return i
}

func (bs bucketSlice) Get(size int) []byte {
for i := range bs {
if 1<<i >= size {
return bs[i].Get()
}
i := bucketToGet(size)
if i < len(bs) {
r := bs[i].Get()
return r
}

return nil
}

func (bs bucketSlice) Put(buf []byte) {
for i := range bs {
if cap(buf) >= 1<<i && cap(buf) < 1<<(i+1) {
bs[i].Put(buf)
break
}
i := bucketToPut(cap(buf))
if i < len(bs) {
bs[i].Put(buf)
}
}

func newAllocFunc(i int) func() []byte {
// makeBucketSlice creates a new bucketSlice with the given parameters. These
// are NOT validated.
func makeBucketSlice(minAlloc, bucketCount int) bucketSlice {
// Create all buckets that are >= the bucket that stores the min
// allocation size.
minBucket := bucketToGet(minAlloc)
buckets := make(bucketSlice, bucketCount)
for i := minBucket; i < bucketCount; i++ {
bp := zeropool.New(newAllocFuncForBucket(i))
buckets[i] = &bp
}

// Buckets smaller than the min bucket size all get/put buffers in the
// minimum bucket size.
for i := 0; i < minBucket; i++ {
buckets[i] = buckets[minBucket]
}

return buckets
}

// newAllocFuncForBucket returns a function to allocate a byte slice of size
// 2^i.
func newAllocFuncForBucket(i int) func() []byte {
return func() []byte {
return make([]byte, 1<<i)
}
Expand Down
96 changes: 96 additions & 0 deletions exp/bufferpool/pool_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package bufferpool

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
)

func TestBucketIndex(t *testing.T) {
tests := []struct {
size int
get int
put int
}{
// Only sizes that are powers of two are obtained and returned
// to the same bucket.
//
// Sizes that are not a power of two must be fetched by the next
// higher power of two, but are returned to the lower one.
{size: 0, get: 0, put: -1},
{size: 1, get: 0, put: 0},
{size: 26, get: 5, put: 4}, // 26 == 0b00011010
{size: 32, get: 5, put: 5},
{size: 1024, get: 10, put: 10},
{size: 1025, get: 11, put: 10},
}

for i := range tests {
tc := tests[i]
t.Run(fmt.Sprintf("%d", tc.size), func(t *testing.T) {
get := bucketToGet(tc.size)
require.Equal(t, tc.get, get)
put := bucketToPut(tc.size)
require.Equal(t, tc.put, put)
})
}
}

func TestBucketSlice(t *testing.T) {
const minAlloc = 8
const bucketCount = 10
const sizeLastBucket = 1 << (bucketCount - 1)

tests := []struct {
size int
wantLen int
wantCap int
}{{
size: -1, // Negative values are skipped.
wantLen: 0,
wantCap: 0,
}, {
size: 0,
wantLen: minAlloc,
wantCap: minAlloc,
}, {
size: 1,
wantLen: minAlloc,
wantCap: minAlloc,
}, {
size: minAlloc,
wantLen: minAlloc,
wantCap: minAlloc,
}, {
size: minAlloc + 1, // Goes to next bucket.
wantLen: minAlloc * 2,
wantCap: minAlloc * 2,
}, {
size: minAlloc*2 + 1,
wantLen: minAlloc * 4,
wantCap: minAlloc * 4,
}, {
size: sizeLastBucket - 1,
wantLen: sizeLastBucket,
wantCap: sizeLastBucket,
}, {
size: sizeLastBucket,
wantLen: sizeLastBucket,
wantCap: sizeLastBucket,
}, {
size: sizeLastBucket + 1, // Anything > last bucket size is not allocated.
wantLen: 0,
wantCap: 0,
}}

for _, tc := range tests {
t.Run(fmt.Sprintf("%d", tc.size), func(t *testing.T) {
bs := makeBucketSlice(minAlloc, bucketCount)
require.Len(t, bs, bucketCount)
buf := bs.Get(tc.size)
require.Len(t, buf, tc.wantLen)
require.Equal(t, tc.wantCap, cap(buf))
})
}
}
Loading
Loading