From 2e3e0f6e9167bb9aa597dbd3fec97893e82a347f Mon Sep 17 00:00:00 2001 From: Kevin Wang Date: Mon, 5 Aug 2024 22:35:53 -0400 Subject: [PATCH] Add option to reuse buffers for interleaved frames --- client.go | 8 +++++++- pkg/base/interleaved_frame.go | 9 +++++++-- pkg/conn/conn.go | 17 ++++++++++++++++- pkg/conn/conn_test.go | 32 ++++++++++++++++++++++++++++++++ 4 files changed, 62 insertions(+), 4 deletions(-) diff --git a/client.go b/client.go index 8c3235eb..e720712c 100644 --- a/client.go +++ b/client.go @@ -251,6 +251,8 @@ type Client struct { BytesReceived *uint64 // pointer to a variable that stores sent bytes. BytesSent *uint64 + // enable reuse of buffers for incoming packets. + BufferReuseEnable bool // // system functions (all optional) @@ -883,7 +885,11 @@ func (c *Client) connOpen() error { c.nconn = nconn bc := bytecounter.New(c.nconn, c.BytesReceived, c.BytesSent) - c.conn = conn.NewConn(bc) + if c.BufferReuseEnable { + c.conn = conn.NewConn(bc, conn.ConnOptionFrameBufferReuseEnable(true)) + } else { + c.conn = conn.NewConn(bc) + } c.reader = &clientReader{ c: c, } diff --git a/pkg/base/interleaved_frame.go b/pkg/base/interleaved_frame.go index 88aee8d0..7db8fcae 100644 --- a/pkg/base/interleaved_frame.go +++ b/pkg/base/interleaved_frame.go @@ -37,8 +37,13 @@ func (f *InterleavedFrame) Unmarshal(br *bufio.Reader) error { payloadLen := int(uint16(header[2])<<8 | uint16(header[3])) f.Channel = int(header[1]) - f.Payload = make([]byte, payloadLen) - + if cap(f.Payload) < payloadLen { + // if there's not enough space, extend the buffer + f.Payload = append(f.Payload[:cap(f.Payload)], make([]byte, payloadLen-cap(f.Payload))...) + } else { + // otherwise, set the len of the buffer to the payloadLen + f.Payload = f.Payload[:payloadLen] + } _, err = io.ReadFull(br, f.Payload) return err } diff --git a/pkg/conn/conn.go b/pkg/conn/conn.go index 59d63978..c9b2c0a9 100644 --- a/pkg/conn/conn.go +++ b/pkg/conn/conn.go @@ -19,16 +19,28 @@ type Conn struct { // reuse interleaved frames. they should never be passed to secondary routines fr base.InterleavedFrame + + frameBufferReuseEnable bool } // NewConn allocates a Conn. -func NewConn(rw io.ReadWriter) *Conn { +func NewConn(rw io.ReadWriter, opts ...ConnOption) *Conn { return &Conn{ w: rw, br: bufio.NewReaderSize(rw, readBufferSize), } } +// ConnOption is an option for Conn. +type ConnOption func(c *Conn) + +// ConnOptionFrameBufferReuseEnable enables buffer reuse. +func ConnOptionFrameBufferReuseEnable(v bool) ConnOption { + return func(c *Conn) { + c.frameBufferReuseEnable = v + } +} + // Read reads a Request, a Response or an Interleaved frame. func (c *Conn) Read() (interface{}, error) { byts, err := c.br.Peek(2) @@ -63,6 +75,9 @@ func (c *Conn) ReadResponse() (*base.Response, error) { // ReadInterleavedFrame reads a InterleavedFrame. func (c *Conn) ReadInterleavedFrame() (*base.InterleavedFrame, error) { + if !c.frameBufferReuseEnable { + c.fr.Payload = nil // reset the payload, causing a new buffer to be allocated + } err := c.fr.Unmarshal(c.br) return &c.fr, err } diff --git a/pkg/conn/conn_test.go b/pkg/conn/conn_test.go index 77b6670d..18debc5c 100644 --- a/pkg/conn/conn_test.go +++ b/pkg/conn/conn_test.go @@ -76,6 +76,38 @@ func TestRead(t *testing.T) { } } +func TestReadInterleavedFrameWithBufferReuse(t *testing.T) { + buf := bytes.NewBuffer([]byte{0x24, 0x6, 0x0, 0x4, 0x1, 0x2, 0x3, 0x4, 0x24, 0x6, 0x0, 0x3, 0x2, 0x3, 0x4, 0x24, 0x6, 0x0, 0x5, 0x3, 0x4, 0x5, 0x6, 0x7}) + conn := NewConn(buf) + // read first packet + dec1, err1 := conn.Read() + require.NoError(t, err1) + require.Equal(t, &base.InterleavedFrame{ + Channel: 6, + Payload: []byte{0x01, 0x02, 0x03, 0x04}, + }, dec1) + p1 := &dec1.(*base.InterleavedFrame).Payload + // read second packet + dec2, err2 := conn.Read() + require.NoError(t, err2) + require.Equal(t, &base.InterleavedFrame{ + Channel: 6, + Payload: []byte{0x02, 0x03, 0x04}, + }, dec2) + p2 := &dec2.(*base.InterleavedFrame).Payload + // read third packet + dec3, err3 := conn.Read() + require.NoError(t, err3) + require.Equal(t, &base.InterleavedFrame{ + Channel: 6, + Payload: []byte{0x03, 0x04, 0x05, 0x06, 0x07}, + }, dec3) + p3 := &dec3.(*base.InterleavedFrame).Payload + // assert that the buffer was reused + require.Equal(t, p1, p2) + require.Equal(t, p2, p3) +} + func TestReadError(t *testing.T) { var buf bytes.Buffer conn := NewConn(&buf)