From ebc32f7d915ba8f3b40a3666064a86e4c7c56d9e Mon Sep 17 00:00:00 2001 From: Louis Thibault Date: Sat, 25 Feb 2023 15:19:55 -0500 Subject: [PATCH 01/13] Move arena code to arena.go. --- arena.go | 293 ++++++++++++++++++++++++++++++++++++++++++++++++ arena_test.go | 235 ++++++++++++++++++++++++++++++++++++++ message.go | 284 ---------------------------------------------- message_test.go | 223 ------------------------------------ 4 files changed, 528 insertions(+), 507 deletions(-) create mode 100644 arena.go create mode 100644 arena_test.go diff --git a/arena.go b/arena.go new file mode 100644 index 00000000..6e76b33b --- /dev/null +++ b/arena.go @@ -0,0 +1,293 @@ +package capnp + +import ( + "errors" + "sync" + + "capnproto.org/go/capnp/v3/exp/bufferpool" + "capnproto.org/go/capnp/v3/internal/str" +) + +// An Arena loads and allocates segments for a Message. +type Arena interface { + // NumSegments returns the number of segments in the arena. + // This must not be larger than 1<<32. + NumSegments() int64 + + // Data loads the data for the segment with the given ID. IDs are in + // the range [0, NumSegments()). + // must be tightly packed in the range [0, NumSegments()). + Data(id SegmentID) ([]byte, error) + + // Allocate selects a segment to place a new object in, creating a + // segment or growing the capacity of a previously loaded segment if + // necessary. If Allocate does not return an error, then the + // difference of the capacity and the length of the returned slice + // must be at least minsz. segs is a map of segments keyed by ID + // using arrays returned by the Data method (although the length of + // these slices may have changed by previous allocations). Allocate + // must not modify segs. + // + // If Allocate creates a new segment, the ID must be one larger than + // the last segment's ID or zero if it is the first segment. + // + // If Allocate returns an previously loaded segment's ID, then the + // arena is responsible for preserving the existing data in the + // returned byte slice. + Allocate(minsz Size, segs map[SegmentID]*Segment) (SegmentID, []byte, error) + + // Release all resources associated with the Arena. Callers MUST NOT + // use the Arena after it has been released. + // + // Calling Release() is OPTIONAL, but may reduce allocations. + // + // Implementations MAY use Release() as a signal to return resources + // to free lists, or otherwise reuse the Arena. However, they MUST + // NOT assume Release() will be called. + Release() +} + +// SingleSegmentArena is an Arena implementation that stores message data +// in a continguous slice. Allocation is performed by first allocating a +// new slice and copying existing data. SingleSegment arena does not fail +// unless the caller attempts to access another segment. +type SingleSegmentArena []byte + +// SingleSegment constructs a SingleSegmentArena from b. b MAY be nil. +// Callers MAY use b to populate the segment for reading, or to reserve +// memory of a specific size. +func SingleSegment(b []byte) *SingleSegmentArena { + return (*SingleSegmentArena)(&b) +} + +func (ssa SingleSegmentArena) NumSegments() int64 { + return 1 +} + +func (ssa SingleSegmentArena) Data(id SegmentID) ([]byte, error) { + if id != 0 { + return nil, errors.New("segment " + str.Utod(id) + " requested in single segment arena") + } + return ssa, nil +} + +func (ssa *SingleSegmentArena) Allocate(sz Size, segs map[SegmentID]*Segment) (SegmentID, []byte, error) { + data := []byte(*ssa) + if segs[0] != nil { + data = segs[0].data + } + if len(data)%int(wordSize) != 0 { + return 0, nil, errors.New("segment size is not a multiple of word size") + } + if hasCapacity(data, sz) { + return 0, data, nil + } + inc, err := nextAlloc(int64(len(data)), int64(maxAllocSize()), sz) + if err != nil { + return 0, nil, err + } + buf := bufferpool.Default.Get(cap(data) + inc) + copied := copy(buf, data) + buf = buf[:copied] + bufferpool.Default.Put(data) + *ssa = buf + return 0, *ssa, nil +} + +func (ssa SingleSegmentArena) String() string { + return "single-segment arena [len=" + str.Itod(len(ssa)) + " cap=" + str.Itod(cap(ssa)) + "]" +} + +// Return this arena to an internal sync.Pool of arenas that can be +// re-used. Any time SingleSegment(nil) is called, arenas from this +// pool will be used if available, which can help reduce memory +// allocations. +// +// All segments will be zeroed before re-use. +// +// Calling Release is optional; if not done the garbage collector +// will release the memory per usual. +func (ssa *SingleSegmentArena) Release() { + bufferpool.Default.Put(*ssa) + *ssa = nil +} + +type roSingleSegment []byte + +func (ss roSingleSegment) NumSegments() int64 { + return 1 +} + +func (ss roSingleSegment) Data(id SegmentID) ([]byte, error) { + if id != 0 { + return nil, errors.New("segment " + str.Utod(id) + " requested in single segment arena") + } + return ss, nil +} + +func (ss roSingleSegment) Allocate(sz Size, segs map[SegmentID]*Segment) (SegmentID, []byte, error) { + return 0, nil, errors.New("arena is read-only") +} + +func (ss roSingleSegment) String() string { + return "read-only single-segment arena [len=" + str.Itod(len(ss)) + "]" +} + +func (ss roSingleSegment) Release() {} + +// MultiSegment is an arena that stores object data across multiple []byte +// buffers, allocating new buffers of exponentially-increasing size when +// full. This avoids the potentially-expensive slice copying of SingleSegment. +type MultiSegmentArena [][]byte + +// MultiSegment returns a new arena that allocates new segments when +// they are full. b MAY be nil. Callers MAY use b to populate the +// buffer for reading or to reserve memory of a specific size. +func MultiSegment(b [][]byte) *MultiSegmentArena { + if b == nil { + return multiSegmentPool.Get().(*MultiSegmentArena) + } + return multiSegment(b) +} + +// Return this arena to an internal sync.Pool of arenas that can be +// re-used. Any time MultiSegment(nil) is called, arenas from this +// pool will be used if available, which can help reduce memory +// allocations. +// +// All segments will be zeroed before re-use. +// +// Calling Release is optional; if not done the garbage collector +// will release the memory per usual. +func (msa *MultiSegmentArena) Release() { + for i, v := range *msa { + // Clear the memory, so there's no junk in here for the next use: + for j := range v { + v[j] = 0 + } + + // Truncate the segment, since we use the length as the marker for + // what's allocated: + (*msa)[i] = v[:0] + } + (*msa) = (*msa)[:0] // Hide the segments + multiSegmentPool.Put(msa) +} + +// Like MultiSegment, but doesn't use the pool +func multiSegment(b [][]byte) *MultiSegmentArena { + return (*MultiSegmentArena)(&b) +} + +var multiSegmentPool = sync.Pool{ + New: func() any { + return multiSegment(nil) + }, +} + +// demuxArena slices b into a multi-segment arena. It assumes that +// len(data) >= hdr.totalSize(). +func demuxArena(hdr streamHeader, data []byte) (Arena, error) { + maxSeg := hdr.maxSegment() + if int64(maxSeg) > int64(maxInt-1) { + return nil, errors.New("number of segments overflows int") + } + segs := make([][]byte, int(maxSeg+1)) + for i := range segs { + sz, err := hdr.segmentSize(SegmentID(i)) + if err != nil { + return nil, err + } + segs[i], data = data[:sz:sz], data[sz:] + } + return MultiSegment(segs), nil +} + +func (msa *MultiSegmentArena) NumSegments() int64 { + return int64(len(*msa)) +} + +func (msa *MultiSegmentArena) Data(id SegmentID) ([]byte, error) { + if int64(id) >= int64(len(*msa)) { + return nil, errors.New("segment " + str.Utod(id) + " requested (arena only has " + + str.Itod(len(*msa)) + " segments)") + } + return (*msa)[id], nil +} + +func (msa *MultiSegmentArena) Allocate(sz Size, segs map[SegmentID]*Segment) (SegmentID, []byte, error) { + var total int64 + for i := 0; i < cap(*msa); i++ { + if i == len(*msa) { + (*msa) = (*msa)[:i+1] + } + data := (*msa)[i] + id := SegmentID(i) + if s := segs[id]; s != nil { + data = s.data + } + if hasCapacity(data, sz) { + return id, data, nil + } + total += int64(cap(data)) + if total < 0 { + // Overflow. + return 0, nil, errors.New("alloc " + str.Utod(sz) + " bytes: message too large") + } + } + n, err := nextAlloc(total, 1<<63-1, sz) + if err != nil { + return 0, nil, err + } + buf := make([]byte, 0, n) + id := SegmentID(len(*msa)) + *msa = append(*msa, buf) + return id, buf, nil +} + +func (msa *MultiSegmentArena) String() string { + return "multi-segment arena [" + str.Itod(len(*msa)) + " segments]" +} + +// nextAlloc computes how much more space to allocate given the number +// of bytes allocated in the entire message and the requested number of +// bytes. It will always return a multiple of wordSize. max must be a +// multiple of wordSize. The sum of curr and the returned size will +// always be less than max. +func nextAlloc(curr, max int64, req Size) (int, error) { + if req == 0 { + return 0, nil + } + if req > maxAllocSize() { + return 0, errors.New("alloc " + req.String() + ": too large") + } + padreq := req.padToWord() + want := curr + int64(padreq) + if want <= curr || want > max { + return 0, errors.New("alloc " + req.String() + ": message size overflow") + } + new := curr + double := new + new + switch { + case want < 1024: + next := (1024 - curr + 7) &^ 7 + if next < curr { + return int((curr + 7) &^ 7), nil + } + return int(next), nil + case want > double: + return int(padreq), nil + default: + for 0 < new && new < want { + new += new / 4 + } + if new <= 0 { + return int(padreq), nil + } + delta := new - curr + if delta > int64(maxAllocSize()) { + return int(maxAllocSize()), nil + } + return int((delta + 7) &^ 7), nil + } +} diff --git a/arena_test.go b/arena_test.go new file mode 100644 index 00000000..51267251 --- /dev/null +++ b/arena_test.go @@ -0,0 +1,235 @@ +package capnp + +import ( + "bytes" + "testing" +) + +func TestSingleSegment(t *testing.T) { + t.Parallel() + t.Helper() + + t.Run("FreshArena", func(t *testing.T) { + t.Parallel() + + arena := SingleSegment(nil) + if n := arena.NumSegments(); n != 1 { + t.Errorf("SingleSegment(nil).NumSegments() = %d; want 1", n) + } + data, err := arena.Data(0) + if len(data) != 0 { + t.Errorf("SingleSegment(nil).Data(0) = %#v; want nil", data) + } + if err != nil { + t.Errorf("SingleSegment(nil).Data(0) error: %v", err) + } + _, err = arena.Data(1) + if err == nil { + t.Error("SingleSegment(nil).Data(1) succeeded; want error") + } + }) + + t.Run("ExistingData", func(t *testing.T) { + t.Parallel() + + arena := SingleSegment(incrementingData(8)) + if n := arena.NumSegments(); n != 1 { + t.Errorf("SingleSegment(incrementingData(8)).NumSegments() = %d; want 1", n) + } + data, err := arena.Data(0) + if want := incrementingData(8); !bytes.Equal(data, want) { + t.Errorf("SingleSegment(incrementingData(8)).Data(0) = %#v; want %#v", data, want) + } + if err != nil { + t.Errorf("SingleSegment(incrementingData(8)).Data(0) error: %v", err) + } + _, err = arena.Data(1) + if err == nil { + t.Error("SingleSegment(incrementingData(8)).Data(1) succeeded; want error") + } + }) +} + +func TestSingleSegmentAllocate(t *testing.T) { + t.Parallel() + + tests := []arenaAllocTest{ + { + name: "empty arena", + init: func() (Arena, map[SegmentID]*Segment) { + return SingleSegment(nil), nil + }, + size: 8, + id: 0, + data: []byte{}, + }, + { + name: "unloaded", + init: func() (Arena, map[SegmentID]*Segment) { + buf := incrementingData(24) + return SingleSegment(buf[:16]), nil + }, + size: 8, + id: 0, + data: incrementingData(16), + }, + { + name: "loaded", + init: func() (Arena, map[SegmentID]*Segment) { + buf := incrementingData(24) + buf = buf[:16] + segs := map[SegmentID]*Segment{ + 0: {id: 0, data: buf}, + } + return SingleSegment(buf), segs + }, + size: 8, + id: 0, + data: incrementingData(16), + }, + { + name: "loaded changes length", + init: func() (Arena, map[SegmentID]*Segment) { + buf := incrementingData(32) + segs := map[SegmentID]*Segment{ + 0: {id: 0, data: buf[:24]}, + } + return SingleSegment(buf[:16]), segs + }, + size: 8, + id: 0, + data: incrementingData(24), + }, + { + name: "message-filled segment", + init: func() (Arena, map[SegmentID]*Segment) { + buf := incrementingData(24) + segs := map[SegmentID]*Segment{ + 0: {id: 0, data: buf}, + } + return SingleSegment(buf[:16]), segs + }, + size: 8, + id: 0, + data: incrementingData(24), + }, + } + for i := range tests { + tests[i].run(t, i) + } +} + +func TestMultiSegment(t *testing.T) { + t.Parallel() + t.Helper() + + t.Run("FreshArena", func(t *testing.T) { + t.Parallel() + + arena := MultiSegment(nil) + if n := arena.NumSegments(); n != 0 { + t.Errorf("MultiSegment(nil).NumSegments() = %d; want 1", n) + } + _, err := arena.Data(0) + if err == nil { + t.Error("MultiSegment(nil).Data(0) succeeded; want error") + } + }) + + t.Run("ExistingData", func(t *testing.T) { + t.Parallel() + + arena := MultiSegment([][]byte{incrementingData(8), incrementingData(24)}) + if n := arena.NumSegments(); n != 2 { + t.Errorf("MultiSegment(...).NumSegments() = %d; want 2", n) + } + data, err := arena.Data(0) + if want := incrementingData(8); !bytes.Equal(data, want) { + t.Errorf("MultiSegment(...).Data(0) = %#v; want %#v", data, want) + } + if err != nil { + t.Errorf("MultiSegment(...).Data(0) error: %v", err) + } + data, err = arena.Data(1) + if want := incrementingData(24); !bytes.Equal(data, want) { + t.Errorf("MultiSegment(...).Data(1) = %#v; want %#v", data, want) + } + if err != nil { + t.Errorf("MultiSegment(...).Data(1) error: %v", err) + } + _, err = arena.Data(2) + if err == nil { + t.Error("MultiSegment(...).Data(2) succeeded; want error") + } + }) +} + +func TestMultiSegmentAllocate(t *testing.T) { + t.Parallel() + + tests := []arenaAllocTest{ + { + name: "empty arena", + init: func() (Arena, map[SegmentID]*Segment) { + return MultiSegment(nil), nil + }, + size: 8, + id: 0, + data: []byte{}, + }, + { + name: "space in unloaded segment", + init: func() (Arena, map[SegmentID]*Segment) { + buf := incrementingData(24) + return MultiSegment([][]byte{buf[:16]}), nil + }, + size: 8, + id: 0, + data: incrementingData(16), + }, + { + name: "space in loaded segment", + init: func() (Arena, map[SegmentID]*Segment) { + buf := incrementingData(24) + buf = buf[:16] + segs := map[SegmentID]*Segment{ + 0: {id: 0, data: buf}, + } + return MultiSegment([][]byte{buf}), segs + }, + size: 8, + id: 0, + data: incrementingData(16), + }, + { + name: "space in loaded segment changes length", + init: func() (Arena, map[SegmentID]*Segment) { + buf := incrementingData(32) + segs := map[SegmentID]*Segment{ + 0: {id: 0, data: buf[:24]}, + } + return MultiSegment([][]byte{buf[:16]}), segs + }, + size: 8, + id: 0, + data: incrementingData(24), + }, + { + name: "message-filled segment", + init: func() (Arena, map[SegmentID]*Segment) { + buf := incrementingData(24) + segs := map[SegmentID]*Segment{ + 0: {id: 0, data: buf}, + } + return MultiSegment([][]byte{buf[:16]}), segs + }, + size: 8, + id: 1, + data: []byte{}, + }, + } + + for i := range tests { + tests[i].run(t, i) + } +} diff --git a/message.go b/message.go index a5faa112..e4297429 100644 --- a/message.go +++ b/message.go @@ -384,290 +384,6 @@ func (wc *writeCounter) Write(b []byte) (n int, err error) { return } -// An Arena loads and allocates segments for a Message. -type Arena interface { - // NumSegments returns the number of segments in the arena. - // This must not be larger than 1<<32. - NumSegments() int64 - - // Data loads the data for the segment with the given ID. IDs are in - // the range [0, NumSegments()). - // must be tightly packed in the range [0, NumSegments()). - Data(id SegmentID) ([]byte, error) - - // Allocate selects a segment to place a new object in, creating a - // segment or growing the capacity of a previously loaded segment if - // necessary. If Allocate does not return an error, then the - // difference of the capacity and the length of the returned slice - // must be at least minsz. segs is a map of segments keyed by ID - // using arrays returned by the Data method (although the length of - // these slices may have changed by previous allocations). Allocate - // must not modify segs. - // - // If Allocate creates a new segment, the ID must be one larger than - // the last segment's ID or zero if it is the first segment. - // - // If Allocate returns an previously loaded segment's ID, then the - // arena is responsible for preserving the existing data in the - // returned byte slice. - Allocate(minsz Size, segs map[SegmentID]*Segment) (SegmentID, []byte, error) - - // Release all resources associated with the Arena. Callers MUST NOT - // use the Arena after it has been released. - // - // Calling Release() is OPTIONAL, but may reduce allocations. - // - // Implementations MAY use Release() as a signal to return resources - // to free lists, or otherwise reuse the Arena. However, they MUST - // NOT assume Release() will be called. - Release() -} - -// SingleSegmentArena is an Arena implementation that stores message data -// in a continguous slice. Allocation is performed by first allocating a -// new slice and copying existing data. SingleSegment arena does not fail -// unless the caller attempts to access another segment. -type SingleSegmentArena []byte - -// SingleSegment constructs a SingleSegmentArena from b. b MAY be nil. -// Callers MAY use b to populate the segment for reading, or to reserve -// memory of a specific size. -func SingleSegment(b []byte) *SingleSegmentArena { - return (*SingleSegmentArena)(&b) -} - -func (ssa SingleSegmentArena) NumSegments() int64 { - return 1 -} - -func (ssa SingleSegmentArena) Data(id SegmentID) ([]byte, error) { - if id != 0 { - return nil, errors.New("segment " + str.Utod(id) + " requested in single segment arena") - } - return ssa, nil -} - -func (ssa *SingleSegmentArena) Allocate(sz Size, segs map[SegmentID]*Segment) (SegmentID, []byte, error) { - data := []byte(*ssa) - if segs[0] != nil { - data = segs[0].data - } - if len(data)%int(wordSize) != 0 { - return 0, nil, errors.New("segment size is not a multiple of word size") - } - if hasCapacity(data, sz) { - return 0, data, nil - } - inc, err := nextAlloc(int64(len(data)), int64(maxAllocSize()), sz) - if err != nil { - return 0, nil, err - } - buf := bufferpool.Default.Get(cap(data) + inc) - copied := copy(buf, data) - buf = buf[:copied] - bufferpool.Default.Put(data) - *ssa = buf - return 0, *ssa, nil -} - -func (ssa SingleSegmentArena) String() string { - return "single-segment arena [len=" + str.Itod(len(ssa)) + " cap=" + str.Itod(cap(ssa)) + "]" -} - -// Return this arena to an internal sync.Pool of arenas that can be -// re-used. Any time SingleSegment(nil) is called, arenas from this -// pool will be used if available, which can help reduce memory -// allocations. -// -// All segments will be zeroed before re-use. -// -// Calling Release is optional; if not done the garbage collector -// will release the memory per usual. -func (ssa *SingleSegmentArena) Release() { - bufferpool.Default.Put(*ssa) - *ssa = nil -} - -type roSingleSegment []byte - -func (ss roSingleSegment) NumSegments() int64 { - return 1 -} - -func (ss roSingleSegment) Data(id SegmentID) ([]byte, error) { - if id != 0 { - return nil, errors.New("segment " + str.Utod(id) + " requested in single segment arena") - } - return ss, nil -} - -func (ss roSingleSegment) Allocate(sz Size, segs map[SegmentID]*Segment) (SegmentID, []byte, error) { - return 0, nil, errors.New("arena is read-only") -} - -func (ss roSingleSegment) String() string { - return "read-only single-segment arena [len=" + str.Itod(len(ss)) + "]" -} - -func (ss roSingleSegment) Release() {} - -// MultiSegment is an arena that stores object data across multiple []byte -// buffers, allocating new buffers of exponentially-increasing size when -// full. This avoids the potentially-expensive slice copying of SingleSegment. -type MultiSegmentArena [][]byte - -// MultiSegment returns a new arena that allocates new segments when -// they are full. b MAY be nil. Callers MAY use b to populate the -// buffer for reading or to reserve memory of a specific size. -func MultiSegment(b [][]byte) *MultiSegmentArena { - if b == nil { - return multiSegmentPool.Get().(*MultiSegmentArena) - } - return multiSegment(b) -} - -// Return this arena to an internal sync.Pool of arenas that can be -// re-used. Any time MultiSegment(nil) is called, arenas from this -// pool will be used if available, which can help reduce memory -// allocations. -// -// All segments will be zeroed before re-use. -// -// Calling Release is optional; if not done the garbage collector -// will release the memory per usual. -func (msa *MultiSegmentArena) Release() { - for i, v := range *msa { - // Clear the memory, so there's no junk in here for the next use: - for j := range v { - v[j] = 0 - } - - // Truncate the segment, since we use the length as the marker for - // what's allocated: - (*msa)[i] = v[:0] - } - (*msa) = (*msa)[:0] // Hide the segments - multiSegmentPool.Put(msa) -} - -// Like MultiSegment, but doesn't use the pool -func multiSegment(b [][]byte) *MultiSegmentArena { - return (*MultiSegmentArena)(&b) -} - -var multiSegmentPool = sync.Pool{ - New: func() any { - return multiSegment(nil) - }, -} - -// demuxArena slices b into a multi-segment arena. It assumes that -// len(data) >= hdr.totalSize(). -func demuxArena(hdr streamHeader, data []byte) (Arena, error) { - maxSeg := hdr.maxSegment() - if int64(maxSeg) > int64(maxInt-1) { - return nil, errors.New("number of segments overflows int") - } - segs := make([][]byte, int(maxSeg+1)) - for i := range segs { - sz, err := hdr.segmentSize(SegmentID(i)) - if err != nil { - return nil, err - } - segs[i], data = data[:sz:sz], data[sz:] - } - return MultiSegment(segs), nil -} - -func (msa *MultiSegmentArena) NumSegments() int64 { - return int64(len(*msa)) -} - -func (msa *MultiSegmentArena) Data(id SegmentID) ([]byte, error) { - if int64(id) >= int64(len(*msa)) { - return nil, errors.New("segment " + str.Utod(id) + " requested (arena only has " + - str.Itod(len(*msa)) + " segments)") - } - return (*msa)[id], nil -} - -func (msa *MultiSegmentArena) Allocate(sz Size, segs map[SegmentID]*Segment) (SegmentID, []byte, error) { - var total int64 - for i := 0; i < cap(*msa); i++ { - if i == len(*msa) { - (*msa) = (*msa)[:i+1] - } - data := (*msa)[i] - id := SegmentID(i) - if s := segs[id]; s != nil { - data = s.data - } - if hasCapacity(data, sz) { - return id, data, nil - } - total += int64(cap(data)) - if total < 0 { - // Overflow. - return 0, nil, errors.New("alloc " + str.Utod(sz) + " bytes: message too large") - } - } - n, err := nextAlloc(total, 1<<63-1, sz) - if err != nil { - return 0, nil, err - } - buf := make([]byte, 0, n) - id := SegmentID(len(*msa)) - *msa = append(*msa, buf) - return id, buf, nil -} - -func (msa *MultiSegmentArena) String() string { - return "multi-segment arena [" + str.Itod(len(*msa)) + " segments]" -} - -// nextAlloc computes how much more space to allocate given the number -// of bytes allocated in the entire message and the requested number of -// bytes. It will always return a multiple of wordSize. max must be a -// multiple of wordSize. The sum of curr and the returned size will -// always be less than max. -func nextAlloc(curr, max int64, req Size) (int, error) { - if req == 0 { - return 0, nil - } - if req > maxAllocSize() { - return 0, errors.New("alloc " + req.String() + ": too large") - } - padreq := req.padToWord() - want := curr + int64(padreq) - if want <= curr || want > max { - return 0, errors.New("alloc " + req.String() + ": message size overflow") - } - new := curr - double := new + new - switch { - case want < 1024: - next := (1024 - curr + 7) &^ 7 - if next < curr { - return int((curr + 7) &^ 7), nil - } - return int(next), nil - case want > double: - return int(padreq), nil - default: - for 0 < new && new < want { - new += new / 4 - } - if new <= 0 { - return int(padreq), nil - } - delta := new - curr - if delta > int64(maxAllocSize()) { - return int(maxAllocSize()), nil - } - return int((delta + 7) &^ 7), nil - } -} - // A Decoder represents a framer that deserializes a particular Cap'n // Proto input stream. type Decoder struct { diff --git a/message_test.go b/message_test.go index bf9a3f8f..5cc0423b 100644 --- a/message_test.go +++ b/message_test.go @@ -170,229 +170,6 @@ func TestAlloc(t *testing.T) { } } -func TestSingleSegment(t *testing.T) { - t.Parallel() - - // fresh arena - { - arena := SingleSegment(nil) - if n := arena.NumSegments(); n != 1 { - t.Errorf("SingleSegment(nil).NumSegments() = %d; want 1", n) - } - data, err := arena.Data(0) - if len(data) != 0 { - t.Errorf("SingleSegment(nil).Data(0) = %#v; want nil", data) - } - if err != nil { - t.Errorf("SingleSegment(nil).Data(0) error: %v", err) - } - _, err = arena.Data(1) - if err == nil { - t.Error("SingleSegment(nil).Data(1) succeeded; want error") - } - } - - // existing data - { - arena := SingleSegment(incrementingData(8)) - if n := arena.NumSegments(); n != 1 { - t.Errorf("SingleSegment(incrementingData(8)).NumSegments() = %d; want 1", n) - } - data, err := arena.Data(0) - if want := incrementingData(8); !bytes.Equal(data, want) { - t.Errorf("SingleSegment(incrementingData(8)).Data(0) = %#v; want %#v", data, want) - } - if err != nil { - t.Errorf("SingleSegment(incrementingData(8)).Data(0) error: %v", err) - } - _, err = arena.Data(1) - if err == nil { - t.Error("SingleSegment(incrementingData(8)).Data(1) succeeded; want error") - } - } -} - -func TestSingleSegmentAllocate(t *testing.T) { - t.Parallel() - - tests := []arenaAllocTest{ - { - name: "empty arena", - init: func() (Arena, map[SegmentID]*Segment) { - return SingleSegment(nil), nil - }, - size: 8, - id: 0, - data: []byte{}, - }, - { - name: "unloaded", - init: func() (Arena, map[SegmentID]*Segment) { - buf := incrementingData(24) - return SingleSegment(buf[:16]), nil - }, - size: 8, - id: 0, - data: incrementingData(16), - }, - { - name: "loaded", - init: func() (Arena, map[SegmentID]*Segment) { - buf := incrementingData(24) - buf = buf[:16] - segs := map[SegmentID]*Segment{ - 0: {id: 0, data: buf}, - } - return SingleSegment(buf), segs - }, - size: 8, - id: 0, - data: incrementingData(16), - }, - { - name: "loaded changes length", - init: func() (Arena, map[SegmentID]*Segment) { - buf := incrementingData(32) - segs := map[SegmentID]*Segment{ - 0: {id: 0, data: buf[:24]}, - } - return SingleSegment(buf[:16]), segs - }, - size: 8, - id: 0, - data: incrementingData(24), - }, - { - name: "message-filled segment", - init: func() (Arena, map[SegmentID]*Segment) { - buf := incrementingData(24) - segs := map[SegmentID]*Segment{ - 0: {id: 0, data: buf}, - } - return SingleSegment(buf[:16]), segs - }, - size: 8, - id: 0, - data: incrementingData(24), - }, - } - for i := range tests { - tests[i].run(t, i) - } -} - -func TestMultiSegment(t *testing.T) { - t.Parallel() - - // fresh arena - { - arena := MultiSegment(nil) - if n := arena.NumSegments(); n != 0 { - t.Errorf("MultiSegment(nil).NumSegments() = %d; want 1", n) - } - _, err := arena.Data(0) - if err == nil { - t.Error("MultiSegment(nil).Data(0) succeeded; want error") - } - } - - // existing data - { - arena := MultiSegment([][]byte{incrementingData(8), incrementingData(24)}) - if n := arena.NumSegments(); n != 2 { - t.Errorf("MultiSegment(...).NumSegments() = %d; want 2", n) - } - data, err := arena.Data(0) - if want := incrementingData(8); !bytes.Equal(data, want) { - t.Errorf("MultiSegment(...).Data(0) = %#v; want %#v", data, want) - } - if err != nil { - t.Errorf("MultiSegment(...).Data(0) error: %v", err) - } - data, err = arena.Data(1) - if want := incrementingData(24); !bytes.Equal(data, want) { - t.Errorf("MultiSegment(...).Data(1) = %#v; want %#v", data, want) - } - if err != nil { - t.Errorf("MultiSegment(...).Data(1) error: %v", err) - } - _, err = arena.Data(2) - if err == nil { - t.Error("MultiSegment(...).Data(2) succeeded; want error") - } - } -} - -func TestMultiSegmentAllocate(t *testing.T) { - t.Parallel() - - tests := []arenaAllocTest{ - { - name: "empty arena", - init: func() (Arena, map[SegmentID]*Segment) { - return MultiSegment(nil), nil - }, - size: 8, - id: 0, - data: []byte{}, - }, - { - name: "space in unloaded segment", - init: func() (Arena, map[SegmentID]*Segment) { - buf := incrementingData(24) - return MultiSegment([][]byte{buf[:16]}), nil - }, - size: 8, - id: 0, - data: incrementingData(16), - }, - { - name: "space in loaded segment", - init: func() (Arena, map[SegmentID]*Segment) { - buf := incrementingData(24) - buf = buf[:16] - segs := map[SegmentID]*Segment{ - 0: {id: 0, data: buf}, - } - return MultiSegment([][]byte{buf}), segs - }, - size: 8, - id: 0, - data: incrementingData(16), - }, - { - name: "space in loaded segment changes length", - init: func() (Arena, map[SegmentID]*Segment) { - buf := incrementingData(32) - segs := map[SegmentID]*Segment{ - 0: {id: 0, data: buf[:24]}, - } - return MultiSegment([][]byte{buf[:16]}), segs - }, - size: 8, - id: 0, - data: incrementingData(24), - }, - { - name: "message-filled segment", - init: func() (Arena, map[SegmentID]*Segment) { - buf := incrementingData(24) - segs := map[SegmentID]*Segment{ - 0: {id: 0, data: buf}, - } - return MultiSegment([][]byte{buf[:16]}), segs - }, - size: 8, - id: 1, - data: []byte{}, - }, - } - - for i := range tests { - tests[i].run(t, i) - } -} - type serializeTest struct { name string segs [][]byte From 96a5cd1eea346fc1be024a10320c5b4603ef9b18 Mon Sep 17 00:00:00 2001 From: Louis Thibault Date: Sat, 25 Feb 2023 12:14:51 -0500 Subject: [PATCH 02/13] Remove unused buffer pool buckets & improve buffer reuse. --- exp/bufferpool/pool.go | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/exp/bufferpool/pool.go b/exp/bufferpool/pool.go index a75ccaa7..c7f4d1a7 100644 --- a/exp/bufferpool/pool.go +++ b/exp/bufferpool/pool.go @@ -4,7 +4,8 @@ package bufferpool import "sync" const ( - nBuckets = 20 + minSize = 1024 + nBuckets = 11 // max 1Mb ) // A default global pool. @@ -18,33 +19,29 @@ type Pool struct { // Get a buffer of the given length. Its capacity may be larger than the // requested size. func (p *Pool) Get(size int) []byte { - for i := 0; i < nBuckets; i++ { - capacity := 1 << i - if capacity >= size { - var ret []byte - item := p.buckets[i].Get() - if item == nil { - ret = make([]byte, capacity) - } else { - ret = item.([]byte) + for i := range p.buckets { + if capacity := minSize << i; capacity >= size { + if item := p.buckets[i].Get(); item != nil { + return item.([]byte)[:size] } - ret = ret[:size] - return ret + + return make([]byte, size, capacity) } } + return make([]byte, size) } // Return a buffer to the pool. Zeros the slice (but not the full backing array) // before making it available for future use. func (p *Pool) Put(buf []byte) { - for i := 0; i < len(buf); i++ { + for i := range buf { buf[i] = 0 } capacity := cap(buf) - for i := 0; i < nBuckets; i++ { - if (1 << i) == capacity { + for i := range p.buckets { + if (1 << i) >= capacity { p.buckets[i].Put(buf[:capacity]) return } From bde05b5ff4e9cf86293f8f7fd1dd009ac667e375 Mon Sep 17 00:00:00 2001 From: Louis Thibault Date: Sat, 25 Feb 2023 12:20:44 -0500 Subject: [PATCH 03/13] Bugfix: return buffer to proper bucket. --- exp/bufferpool/pool.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/exp/bufferpool/pool.go b/exp/bufferpool/pool.go index c7f4d1a7..27c545be 100644 --- a/exp/bufferpool/pool.go +++ b/exp/bufferpool/pool.go @@ -41,7 +41,10 @@ func (p *Pool) Put(buf []byte) { capacity := cap(buf) for i := range p.buckets { - if (1 << i) >= capacity { + bucket := (1 << i) + next := (1< Date: Sat, 25 Feb 2023 12:21:53 -0500 Subject: [PATCH 04/13] Bugfix: compute bucket size based on minSize. --- exp/bufferpool/pool.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/exp/bufferpool/pool.go b/exp/bufferpool/pool.go index 27c545be..1fa547a9 100644 --- a/exp/bufferpool/pool.go +++ b/exp/bufferpool/pool.go @@ -41,8 +41,8 @@ func (p *Pool) Put(buf []byte) { capacity := cap(buf) for i := range p.buckets { - bucket := (1 << i) - next := (1< Date: Sat, 25 Feb 2023 15:06:36 -0500 Subject: [PATCH 05/13] Support user-defined pool behavior. Improve buffer reuse in default pool. --- capability.go | 12 +-- exp/bufferpool/pool.go | 168 +++++++++++++++++++++++++++++------- exp/bufferpool/pool_test.go | 66 ++++++++++++++ message.go | 22 +---- rpc/transport/transport.go | 5 +- 5 files changed, 214 insertions(+), 59 deletions(-) create mode 100644 exp/bufferpool/pool_test.go diff --git a/capability.go b/capability.go index 226973ff..39506fc2 100644 --- a/capability.go +++ b/capability.go @@ -162,10 +162,10 @@ func (c Client) setupLeakReporting(creatorFunc int) { } c.creatorFunc = creatorFunc _, c.creatorFile, c.creatorLine, _ = runtime.Caller(2) - buf := bufferpool.Default.Get(1e6) + buf := bufferpool.Get(1e6) n := runtime.Stack(buf, false) c.creatorStack = string(buf[:n]) - bufferpool.Default.Put(buf) + bufferpool.Put(buf) c.setFinalizer() } @@ -397,10 +397,10 @@ func (c Client) SendCall(ctx context.Context, s Send) (*Answer, ReleaseFunc) { // SendStreamCall is like SendCall except that: // -// 1. It does not return an answer for the eventual result. -// 2. If the call returns an error, all future calls on this -// client will return the same error (without starting -// the method or calling PlaceArgs). +// 1. It does not return an answer for the eventual result. +// 2. If the call returns an error, all future calls on this +// client will return the same error (without starting +// the method or calling PlaceArgs). func (c Client) SendStreamCall(ctx context.Context, s Send) error { var streamError error syncutil.With(&c.mu, func() { diff --git a/exp/bufferpool/pool.go b/exp/bufferpool/pool.go index 1fa547a9..28b1b549 100644 --- a/exp/bufferpool/pool.go +++ b/exp/bufferpool/pool.go @@ -1,52 +1,162 @@ // Package bufferpool supports object pooling for byte buffers. package bufferpool -import "sync" +import ( + "sync" +) const ( - minSize = 1024 - nBuckets = 11 // max 1Mb + defaultMinSize = 1024 + defaultBucketCount = 10 ) -// A default global pool. -var Default Pool +var pool Pool = &BasicPool{} + +// Set the global pool. This can be used to override the default implementation +// to optimize for different allocation profiles. For example, applications that +// target embedded platforms may wish to lower the minimum buffer size from 1KiB +// to 512B, or less. +// +// The default implementation uses BasicPool, which is suitable for a wide range +// of applications. Most applications will not benefit from using a custom pool, +// so be sure to profile. +func Set(p Pool) { + pool = p +} + +// Get a buffer of with the specified size from the global pool. The +// resulting buffer's capacity MAY exceed size. The buffer SHOULD NOT +// be resized beyond its underlying capacity, or it will leak to the +// garbage collector. Callers MAY wish to over-allocate if the intend +// to append to the buffer. +func Get(size int) []byte { + return pool.Get(size)[:size] +} + +// Put returns the buffer to the global pool. The slice will be zeroed +// before returning it to the pool. Note that only the first len(buf) +// bytes are zeroed, and not the full backing array. +// +// TODO: should we zero out the entire array? Users may have resized +// the slice, so it may contain sensitive data. +func Put(buf []byte) { + for i := range buf { + buf[i] = 0 + } + + pool.Put(buf[:cap(buf)]) +} + +// Pool is a free list of []byte buffers of heterogeneous sizes. +// Implementations MUST be thread safe and MUST support efficient +// paging of buffers with arbitrary capacity. Most applications +// SHOULD use BasicPool. +type Pool interface { + // Get a buffer of len >= size. The caller will resize the + // buffer to size, so implementations need not bother. + Get(size int) []byte + + // Put returns the buffer to the pool. The slice will be + // zeroed and resized to its underlying capacity before it + // is passed to Put. + Put([]byte) +} -// A pool of buffers, in variable sizes. -type Pool struct { - buckets [nBuckets]sync.Pool +// BasicPool is a general-purpose implementation of Pool, using +// sync.Pool. BasicPool maintains a set of N buckets containing +// buffers of exponentially-increasing size, starting from Min. +// Buffers whose capacity exceeds Min << N are ignored and left +// to the garbage-collector. +// +// The zero-value BasicPool is ready to use, defaulting to N=10 +// and Min=1024 (1KiB - ~1MiB buffer sizes). Most applications +// will not benefit from tuning these parameters. +// +// As a general rule, increasing Min reduces GC latency at the +// expense of increased memory usage. Increasing N can reduce +// GC latency in applications that frequently allocate buffers +// larger than 1MiB. +type BasicPool struct { + once sync.Once + Min, N int + buckets []bucket } -// Get a buffer of the given length. Its capacity may be larger than the -// requested size. -func (p *Pool) Get(size int) []byte { - for i := range p.buckets { - if capacity := minSize << i; capacity >= size { - if item := p.buckets[i].Get(); item != nil { - return item.([]byte)[:size] - } +func (p *BasicPool) Get(size int) []byte { + p.init() - return make([]byte, size, capacity) + for _, b := range p.buckets { + if b.Fits(size) { + return b.Get() } } return make([]byte, size) } -// Return a buffer to the pool. Zeros the slice (but not the full backing array) -// before making it available for future use. -func (p *Pool) Put(buf []byte) { - for i := range buf { - buf[i] = 0 +// Return a buffer to the pool. +func (p *BasicPool) Put(buf []byte) { + p.init() + + for _, b := range p.buckets { + if b.TryPut(buf) { + return + } } +} - capacity := cap(buf) - for i := range p.buckets { - bucket := (minSize << i) - next := (minSize<= size. +func (b bucket) Fits(size int) bool { + return b.minCap >= size +} + +// Get a buffer. The buffer's capacity is guaranteed to be at +// least b.minCap, and smaller than b.capLimit. +func (b bucket) Get() []byte { + return b.pool.Get().([]byte) +} + +// TryPut returns the buffer to the underlying sync.Pool if the +// buffer's capacity satisfies the bucket's constraints. +// +// Returns true if the buffer was put back in the pool. +func (b bucket) TryPut(buf []byte) (ok bool) { + if ok = b.minCap <= cap(buf) && b.capLimit > cap(buf); ok { + b.pool.Put(buf) + } + + return +} diff --git a/exp/bufferpool/pool_test.go b/exp/bufferpool/pool_test.go new file mode 100644 index 00000000..27d3d067 --- /dev/null +++ b/exp/bufferpool/pool_test.go @@ -0,0 +1,66 @@ +package bufferpool + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestBasicBuffer(t *testing.T) { + t.Parallel() + + assert.Len(t, Get(32), 32, "should return buffer with 32-byte length") + assert.Equal(t, 1024, cap(Get(32)), "should return buffer with 1KiB capacity") + + assert.Len(t, Get(1025), 1025, "should return buffer with 1025-byte length") + assert.Equal(t, 2048, cap(Get(1025)), "should return buffer with 2KiB-byte capacity") +} + +func TestBucket(t *testing.T) { + t.Parallel() + t.Helper() + + t.Run("Fits", func(t *testing.T) { + t.Parallel() + + // 0th bucket + bucket := newBucket(32, 0) + assert.True(t, bucket.Fits(32), "should fit 32-byte buffer") + assert.True(t, bucket.Fits(8), "should fit buffer smaller than min. capacity") + assert.False(t, bucket.Fits(33), "should not fit buffer larger than min. capacity") + + // 1st bucket + bucket = newBucket(32, 1) + assert.True(t, bucket.Fits(32<<1), "should fit 32-byte buffer") + assert.True(t, bucket.Fits(8<<1), "should fit buffer smaller than min. capacity") + assert.False(t, bucket.Fits(33<<1), "should not fit buffer larger than min. capacity") + }) + + t.Run("Get", func(t *testing.T) { + t.Parallel() + + // 0th bucket + bucket := newBucket(32, 0) + assert.Len(t, bucket.Get(), 32, "should allocate new buffers with min. capacity") + + // 1st bucket + bucket = newBucket(32, 1) + assert.Len(t, bucket.Get(), 32<<1, "should allocate new buffers with min. capacity") + }) + + t.Run("TryPut", func(t *testing.T) { + t.Parallel() + + // 0th bucket + bucket := newBucket(32, 0) + assert.True(t, bucket.TryPut(make([]byte, 32)), "should consume buffer with exactly min. capacity") + assert.True(t, bucket.TryPut(make([]byte, 63)), "should consume buffer with cap < limit") + assert.False(t, bucket.TryPut(make([]byte, 64)), "should not consume buffer with cap == limit") + + // 1st bucket + bucket = newBucket(32, 1) + assert.True(t, bucket.TryPut(make([]byte, 32<<1)), "should consume buffer with exactly min. capacity") + assert.True(t, bucket.TryPut(make([]byte, 63<<1)), "should consume buffer with cap < limit") + assert.False(t, bucket.TryPut(make([]byte, 64<<1)), "should not consume buffer with cap == limit") + }) +} diff --git a/message.go b/message.go index e4297429..341dfb86 100644 --- a/message.go +++ b/message.go @@ -392,8 +392,6 @@ type Decoder struct { wordbuf [wordSize]byte hdrbuf []byte - bufferPool *bufferpool.Pool - reuse bool buf []byte msg Message @@ -468,12 +466,7 @@ func (d *Decoder) Decode() (*Message, error) { // Read segments. if !d.reuse { - var buf []byte - if d.bufferPool == nil { - buf = make([]byte, int(total)) - } else { - buf = d.bufferPool.Get(int(total)) - } + buf := bufferpool.Get(int(total)) if _, err := io.ReadFull(d.r, buf); err != nil { return nil, exc.WrapError("decode: read segments", err) } @@ -526,19 +519,8 @@ func (d *Decoder) ReuseBuffer() { d.reuse = true } -// SetBufferPool registers a buffer pool to allocate message space from, rather -// than directly allocating buffers with make(). This can help reduce pressure -// on the garbage collector; pass messages to d.ReleaseMessage() when done with -// them. -func (d *Decoder) SetBufferPool(p *bufferpool.Pool) { - d.bufferPool = p -} - func (d *Decoder) ReleaseMessage(m *Message) { - if d.bufferPool == nil { - return - } - d.bufferPool.Put(m.originalBuffer) + bufferpool.Put(m.originalBuffer) } // Unmarshal reads an unpacked serialized stream into a message. No diff --git a/rpc/transport/transport.go b/rpc/transport/transport.go index 9fff096c..d43b71a7 100644 --- a/rpc/transport/transport.go +++ b/rpc/transport/transport.go @@ -12,7 +12,6 @@ import ( capnp "capnproto.org/go/capnp/v3" "capnproto.org/go/capnp/v3/exc" - "capnproto.org/go/capnp/v3/exp/bufferpool" rpccp "capnproto.org/go/capnp/v3/std/capnp/rpc" ) @@ -199,13 +198,11 @@ type streamCodec struct { } func newStreamCodec(rwc io.ReadWriteCloser, f streamEncoding) *streamCodec { - ret := &streamCodec{ + return &streamCodec{ Decoder: f.NewDecoder(rwc), Encoder: f.NewEncoder(rwc), Closer: rwc, } - ret.SetBufferPool(&bufferpool.Default) - return ret } type streamEncoding interface { From 45875b1cfe77cab40387fe7be536aef1fe4cedd2 Mon Sep 17 00:00:00 2001 From: Louis Thibault Date: Sat, 25 Feb 2023 15:24:12 -0500 Subject: [PATCH 06/13] Small formatting fix. --- exp/bufferpool/pool.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/exp/bufferpool/pool.go b/exp/bufferpool/pool.go index 28b1b549..e79b573e 100644 --- a/exp/bufferpool/pool.go +++ b/exp/bufferpool/pool.go @@ -1,9 +1,7 @@ // Package bufferpool supports object pooling for byte buffers. package bufferpool -import ( - "sync" -) +import "sync" const ( defaultMinSize = 1024 From 8b221d6d54cdc019bf81d48b237c4fd8ba4a24ee Mon Sep 17 00:00:00 2001 From: Louis Thibault Date: Sat, 25 Feb 2023 15:25:53 -0500 Subject: [PATCH 07/13] Simplify docstring for bufferpool.Get. --- exp/bufferpool/pool.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/exp/bufferpool/pool.go b/exp/bufferpool/pool.go index e79b573e..d4823318 100644 --- a/exp/bufferpool/pool.go +++ b/exp/bufferpool/pool.go @@ -25,8 +25,7 @@ func Set(p Pool) { // Get a buffer of with the specified size from the global pool. The // resulting buffer's capacity MAY exceed size. The buffer SHOULD NOT // be resized beyond its underlying capacity, or it will leak to the -// garbage collector. Callers MAY wish to over-allocate if the intend -// to append to the buffer. +// garbage collector. func Get(size int) []byte { return pool.Get(size)[:size] } From ad2994cb3327c6db4b78cab864efd10dd9803263 Mon Sep 17 00:00:00 2001 From: Louis Thibault Date: Sat, 25 Feb 2023 15:29:01 -0500 Subject: [PATCH 08/13] Correct docstring for BasicPool. --- exp/bufferpool/pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exp/bufferpool/pool.go b/exp/bufferpool/pool.go index d4823318..aee15f69 100644 --- a/exp/bufferpool/pool.go +++ b/exp/bufferpool/pool.go @@ -72,7 +72,7 @@ type Pool interface { // As a general rule, increasing Min reduces GC latency at the // expense of increased memory usage. Increasing N can reduce // GC latency in applications that frequently allocate buffers -// larger than 1MiB. +// of size >=1MiB. type BasicPool struct { once sync.Once Min, N int From 83a9323c1fb833ef82d552185c3e8c749d1b8346 Mon Sep 17 00:00:00 2001 From: Louis Thibault Date: Sat, 25 Feb 2023 15:29:59 -0500 Subject: [PATCH 09/13] Remove unneeded docstring on BasicPool.Put. --- exp/bufferpool/pool.go | 1 - 1 file changed, 1 deletion(-) diff --git a/exp/bufferpool/pool.go b/exp/bufferpool/pool.go index aee15f69..616887c8 100644 --- a/exp/bufferpool/pool.go +++ b/exp/bufferpool/pool.go @@ -91,7 +91,6 @@ func (p *BasicPool) Get(size int) []byte { return make([]byte, size) } -// Return a buffer to the pool. func (p *BasicPool) Put(buf []byte) { p.init() From 1f45a0a84d328da42a0cd04cb0b447a53c457ad1 Mon Sep 17 00:00:00 2001 From: Louis Thibault Date: Sat, 25 Feb 2023 15:31:01 -0500 Subject: [PATCH 10/13] Prefer range loop over explicit integer incrementation. --- exp/bufferpool/pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exp/bufferpool/pool.go b/exp/bufferpool/pool.go index 616887c8..34594d98 100644 --- a/exp/bufferpool/pool.go +++ b/exp/bufferpool/pool.go @@ -112,7 +112,7 @@ func (p *BasicPool) init() { } p.buckets = make([]bucket, p.N) - for i := 0; i < p.N; i++ { + for i := range p.buckets { p.buckets[i] = newBucket(p.Min, i) } }) From fbb5d6767f3a7e213babad22dd4b74025b9eff4a Mon Sep 17 00:00:00 2001 From: Louis Thibault Date: Sat, 25 Feb 2023 00:45:01 -0500 Subject: [PATCH 11/13] Allocate SingleSegmentArena from bufferpool. --- arena.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/arena.go b/arena.go index 6e76b33b..1ddd1689 100644 --- a/arena.go +++ b/arena.go @@ -86,10 +86,10 @@ func (ssa *SingleSegmentArena) Allocate(sz Size, segs map[SegmentID]*Segment) (S if err != nil { return 0, nil, err } - buf := bufferpool.Default.Get(cap(data) + inc) + buf := bufferpool.Get(cap(data) + inc) copied := copy(buf, data) buf = buf[:copied] - bufferpool.Default.Put(data) + bufferpool.Put(data) *ssa = buf return 0, *ssa, nil } @@ -108,7 +108,7 @@ func (ssa SingleSegmentArena) String() string { // Calling Release is optional; if not done the garbage collector // will release the memory per usual. func (ssa *SingleSegmentArena) Release() { - bufferpool.Default.Put(*ssa) + bufferpool.Put(*ssa) *ssa = nil } From d998c0980246e2953e9bf9b1fd9e9c950e7a26d6 Mon Sep 17 00:00:00 2001 From: Louis Thibault Date: Sat, 25 Feb 2023 16:27:41 -0500 Subject: [PATCH 12/13] Move demuxArena to message.go, where it is used. --- arena.go | 18 ------------------ message.go | 18 ++++++++++++++++++ 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/arena.go b/arena.go index 1ddd1689..ef503536 100644 --- a/arena.go +++ b/arena.go @@ -185,24 +185,6 @@ var multiSegmentPool = sync.Pool{ }, } -// demuxArena slices b into a multi-segment arena. It assumes that -// len(data) >= hdr.totalSize(). -func demuxArena(hdr streamHeader, data []byte) (Arena, error) { - maxSeg := hdr.maxSegment() - if int64(maxSeg) > int64(maxInt-1) { - return nil, errors.New("number of segments overflows int") - } - segs := make([][]byte, int(maxSeg+1)) - for i := range segs { - sz, err := hdr.segmentSize(SegmentID(i)) - if err != nil { - return nil, err - } - segs[i], data = data[:sz:sz], data[sz:] - } - return MultiSegment(segs), nil -} - func (msa *MultiSegmentArena) NumSegments() int64 { return int64(len(*msa)) } diff --git a/message.go b/message.go index 341dfb86..e360eaf5 100644 --- a/message.go +++ b/message.go @@ -762,4 +762,22 @@ func hasCapacity(b []byte, sz Size) bool { return sz <= Size(cap(b)-len(b)) } +// demuxArena slices b into a multi-segment arena. It assumes that +// len(data) >= hdr.totalSize(). +func demuxArena(hdr streamHeader, data []byte) (Arena, error) { + maxSeg := hdr.maxSegment() + if int64(maxSeg) > int64(maxInt-1) { + return nil, errors.New("number of segments overflows int") + } + segs := make([][]byte, int(maxSeg+1)) + for i := range segs { + sz, err := hdr.segmentSize(SegmentID(i)) + if err != nil { + return nil, err + } + segs[i], data = data[:sz:sz], data[sz:] + } + return MultiSegment(segs), nil +} + const maxInt = int(^uint(0) >> 1) From 786bf12298ed7e897b0634d2ba4c83d776cd0e1d Mon Sep 17 00:00:00 2001 From: Louis Thibault Date: Sat, 25 Feb 2023 16:31:36 -0500 Subject: [PATCH 13/13] Pool capnp.Message. --- answer_test.go | 6 ++--- answerqueue.go | 26 ++++++++----------- message.go | 51 +++++++++++++++++++++++++++----------- rpc/transport/pipe.go | 2 -- rpc/transport/transport.go | 19 +++----------- server/server.go | 8 ++---- 6 files changed, 57 insertions(+), 55 deletions(-) diff --git a/answer_test.go b/answer_test.go index 24fe4ac8..3eab970b 100644 --- a/answer_test.go +++ b/answer_test.go @@ -58,7 +58,7 @@ func TestPromiseFulfill(t *testing.T) { p := NewPromise(dummyMethod, dummyPipelineCaller{}) done := p.Answer().Done() msg, seg, _ := NewMessage(SingleSegment(nil)) - defer msg.Reset(nil) + defer msg.Release() res, _ := NewStruct(seg, ObjectSize{DataSize: 8}) p.Fulfill(res.ToPtr()) select { @@ -73,7 +73,7 @@ func TestPromiseFulfill(t *testing.T) { defer p.ReleaseClients() ans := p.Answer() msg, seg, _ := NewMessage(SingleSegment(nil)) - defer msg.Reset(nil) + defer msg.Release() res, _ := NewStruct(seg, ObjectSize{DataSize: 8}) res.SetUint32(0, 0xdeadbeef) p.Fulfill(res.ToPtr()) @@ -96,7 +96,7 @@ func TestPromiseFulfill(t *testing.T) { c := NewClient(h) defer c.Release() msg, seg, _ := NewMessage(SingleSegment(nil)) - defer msg.Reset(nil) + defer msg.Release() res, _ := NewStruct(seg, ObjectSize{PointerCount: 3}) res.SetPtr(1, NewInterface(seg, msg.AddCap(c.AddRef())).ToPtr()) diff --git a/answerqueue.go b/answerqueue.go index e85ff9a7..f221e7e4 100644 --- a/answerqueue.go +++ b/answerqueue.go @@ -14,12 +14,12 @@ import ( // // An AnswerQueue can be in one of three states: // -// 1) Queueing. Incoming method calls will be added to the queue. -// 2) Draining, entered by calling Fulfill or Reject. Queued method -// calls will be delivered in sequence, and new incoming method calls -// will block until the AnswerQueue enters the Drained state. -// 3) Drained, entered once all queued methods have been delivered. -// Incoming methods are passthrough. +// 1. Queueing. Incoming method calls will be added to the queue. +// 2. Draining, entered by calling Fulfill or Reject. Queued method +// calls will be delivered in sequence, and new incoming method calls +// will block until the AnswerQueue enters the Drained state. +// 3. Drained, entered once all queued methods have been delivered. +// Incoming methods are passthrough. type AnswerQueue struct { method Method draining chan struct{} // closed while exiting queueing state @@ -168,7 +168,7 @@ func (qc queueCaller) PipelineSend(ctx context.Context, transform []PipelineOp, return ErrorAnswer(s.Method, err), func() {} } r.ReleaseArgs = func() { - r.Args.Message().Reset(nil) + r.Args.Message().Release() } } else { r.ReleaseArgs = func() {} @@ -257,8 +257,8 @@ func (sr *StructReturner) ReleaseResults() { if !alloced { return } - if err != nil && msg != nil { - msg.Reset(nil) + if err != nil { + msg.Release() // nil-safe } } @@ -279,9 +279,7 @@ func (sr *StructReturner) Answer(m Method, pcall PipelineCaller) (*Answer, Relea msg := sr.result.Message() sr.result = Struct{} sr.mu.Unlock() - if msg != nil { - msg.Reset(nil) - } + msg.Release() // nil-safe } } sr.p = NewPromise(m, pcall) @@ -293,8 +291,6 @@ func (sr *StructReturner) Answer(m Method, pcall PipelineCaller) (*Answer, Relea sr.result = Struct{} sr.mu.Unlock() sr.p.ReleaseClients() - if msg != nil { - msg.Reset(nil) - } + msg.Release() // nil-safe } } diff --git a/message.go b/message.go index e360eaf5..ef1610bc 100644 --- a/message.go +++ b/message.go @@ -37,10 +37,6 @@ type Message struct { Arena Arena - // If not nil, the original buffer from which this message was decoded. - // This mostly for the benefit of returning buffers to pools and such. - originalBuffer []byte - // CapTable is the indexed list of the clients referenced in the // message. Capability pointers inside the message will use this table // to map pointers to Clients. The table is usually populated by the @@ -74,9 +70,9 @@ type Message struct { // NewMessage creates a message with a new root and returns the first // segment. It is an error to call NewMessage on an arena with data in it. func NewMessage(arena Arena) (*Message, *Segment, error) { - var msg Message + msg := msgpool.Get() first, err := msg.Reset(arena) - return &msg, first, err + return msg, first, err } // NewSingleSegmentMessage(b) is equivalent to NewMessage(SingleSegment(b)), except @@ -100,6 +96,20 @@ func NewMultiSegmentMessage(b [][]byte) (msg *Message, first *Segment) { return msg, first } +// Release resources acquired by the Message and return it +// to a global message pool, where it can be reused. The +// message's arena is also released. Calling Release() is +// OPTIONAL. +// +// Note that this will reset the message, invalidating any +// pointers and releasing any clients in the cap table. +func (m *Message) Release() { + if m != nil { + m.Reset(nil) + msgpool.Put(m) + } +} + // Reset the message to use a different arena, allowing it // to be reused. This invalidates any existing pointers in // the Message, and releases all clients in the cap table, @@ -109,6 +119,10 @@ func (m *Message) Reset(arena Arena) (first *Segment, err error) { c.Release() } + if m.Arena != nil { + m.Arena.Release() + } + *m = Message{ Arena: arena, TraverseLimit: m.TraverseLimit, @@ -474,10 +488,7 @@ func (d *Decoder) Decode() (*Message, error) { if err != nil { return nil, exc.WrapError("decode", err) } - return &Message{ - Arena: arena, - originalBuffer: buf, - }, nil + return &Message{Arena: arena}, nil } d.buf = resizeSlice(d.buf, int(total)) if _, err := io.ReadFull(d.r, d.buf); err != nil { @@ -519,10 +530,6 @@ func (d *Decoder) ReuseBuffer() { d.reuse = true } -func (d *Decoder) ReleaseMessage(m *Message) { - bufferpool.Put(m.originalBuffer) -} - // Unmarshal reads an unpacked serialized stream into a message. No // copying is performed, so the objects in the returned message read // directly from data. @@ -781,3 +788,19 @@ func demuxArena(hdr streamHeader, data []byte) (Arena, error) { } const maxInt = int(^uint(0) >> 1) + +var msgpool messagePool + +type messagePool sync.Pool + +func (p *messagePool) Get() *Message { + if v := (*sync.Pool)(p).Get(); v != nil { + return v.(*Message) + } + + return new(Message) +} + +func (p *messagePool) Put(m *Message) { + (*sync.Pool)(p).Put(m) +} diff --git a/rpc/transport/pipe.go b/rpc/transport/pipe.go index d3b37380..3ae202cf 100644 --- a/rpc/transport/pipe.go +++ b/rpc/transport/pipe.go @@ -72,8 +72,6 @@ func (p *pipe) Decode() (*capnp.Message, error) { } -func (*pipe) ReleaseMessage(*capnp.Message) {} - func (p *pipe) Close() error { close(p.closed) syncutil.With(&p.sendMu, func() { diff --git a/rpc/transport/transport.go b/rpc/transport/transport.go index d43b71a7..8b3f546a 100644 --- a/rpc/transport/transport.go +++ b/rpc/transport/transport.go @@ -71,10 +71,6 @@ type IncomingMessage struct { type Codec interface { Encode(*capnp.Message) error Decode() (*capnp.Message, error) - - // Mark a message previously returned by Decode as no longer needed. The - // Codec may re-use the space for future messages. - ReleaseMessage(*capnp.Message) Close() error } @@ -136,13 +132,10 @@ func (s *transport) NewMessage() (OutgoingMessage, error) { } release := func() { - if alreadyReleased { - return + if !alreadyReleased { + alreadyReleased = true + msg.Release() } - alreadyReleased = true - - msg.Reset(nil) - arena.Release() } return OutgoingMessage{ @@ -167,13 +160,9 @@ func (s *transport) RecvMessage() (IncomingMessage, error) { return IncomingMessage{}, err } - release := func() { - msg.Reset(nil) - s.c.ReleaseMessage(msg) - } return IncomingMessage{ Message: rmsg, - Release: release, + Release: msg.Release, }, nil } diff --git a/server/server.go b/server/server.go index 50548356..2deb88dc 100644 --- a/server/server.go +++ b/server/server.go @@ -149,7 +149,7 @@ func (srv *Server) Send(ctx context.Context, s capnp.Send) (*capnp.Answer, capnp Args: args, ReleaseArgs: func() { if msg := args.Message(); msg != nil { - msg.Reset(nil) + msg.Release() args = capnp.Struct{} } }, @@ -279,7 +279,7 @@ func sendArgsToStruct(s capnp.Send) (capnp.Struct, error) { return capnp.Struct{}, err } if err := s.PlaceArgs(st); err != nil { - st.Message().Reset(nil) + st.Message().Release() return capnp.Struct{}, exc.WrapError("place args", err) } return st, nil @@ -321,10 +321,6 @@ func (sm sortedMethods) Swap(i, j int) { sm[i], sm[j] = sm[j], sm[i] } -type resultsAllocer interface { - AllocResults(capnp.ObjectSize) (capnp.Struct, error) -} - func newError(msg string) error { return exc.New(exc.Failed, "capnp server", msg) }