Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to reuse buffers for interleaved frames #599

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ type Client struct {
BytesReceived *uint64
// pointer to a variable that stores sent bytes.
BytesSent *uint64
// enable reuse of buffers for incoming packets.
BufferReuseEnable bool

//
// system functions (all optional)
Expand Down Expand Up @@ -883,7 +885,11 @@ func (c *Client) connOpen() error {

c.nconn = nconn
bc := bytecounter.New(c.nconn, c.BytesReceived, c.BytesSent)
c.conn = conn.NewConn(bc)
if c.BufferReuseEnable {
c.conn = conn.NewConn(bc, conn.ConnOptionFrameBufferReuseEnable(true))
} else {
c.conn = conn.NewConn(bc)
}
c.reader = &clientReader{
c: c,
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/base/interleaved_frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,13 @@ func (f *InterleavedFrame) Unmarshal(br *bufio.Reader) error {
payloadLen := int(uint16(header[2])<<8 | uint16(header[3]))

f.Channel = int(header[1])
f.Payload = make([]byte, payloadLen)

if cap(f.Payload) < payloadLen {
// if there's not enough space, extend the buffer
f.Payload = append(f.Payload[:cap(f.Payload)], make([]byte, payloadLen-cap(f.Payload))...)
} else {
// otherwise, set the len of the buffer to the payloadLen
f.Payload = f.Payload[:payloadLen]
}
_, err = io.ReadFull(br, f.Payload)
return err
}
Expand Down
17 changes: 16 additions & 1 deletion pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,28 @@ type Conn struct {

// reuse interleaved frames. they should never be passed to secondary routines
fr base.InterleavedFrame

frameBufferReuseEnable bool
}

// NewConn allocates a Conn.
func NewConn(rw io.ReadWriter) *Conn {
func NewConn(rw io.ReadWriter, opts ...ConnOption) *Conn {
return &Conn{
w: rw,
br: bufio.NewReaderSize(rw, readBufferSize),
}
}

// ConnOption is an option for Conn.
type ConnOption func(c *Conn)

// ConnOptionFrameBufferReuseEnable enables buffer reuse.
func ConnOptionFrameBufferReuseEnable(v bool) ConnOption {
return func(c *Conn) {
c.frameBufferReuseEnable = v
}
}

// Read reads a Request, a Response or an Interleaved frame.
func (c *Conn) Read() (interface{}, error) {
byts, err := c.br.Peek(2)
Expand Down Expand Up @@ -63,6 +75,9 @@ func (c *Conn) ReadResponse() (*base.Response, error) {

// ReadInterleavedFrame reads a InterleavedFrame.
func (c *Conn) ReadInterleavedFrame() (*base.InterleavedFrame, error) {
if !c.frameBufferReuseEnable {
c.fr.Payload = nil // reset the payload, causing a new buffer to be allocated
}
err := c.fr.Unmarshal(c.br)
return &c.fr, err
}
Expand Down
32 changes: 32 additions & 0 deletions pkg/conn/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,38 @@ func TestRead(t *testing.T) {
}
}

func TestReadInterleavedFrameWithBufferReuse(t *testing.T) {
buf := bytes.NewBuffer([]byte{0x24, 0x6, 0x0, 0x4, 0x1, 0x2, 0x3, 0x4, 0x24, 0x6, 0x0, 0x3, 0x2, 0x3, 0x4, 0x24, 0x6, 0x0, 0x5, 0x3, 0x4, 0x5, 0x6, 0x7})
conn := NewConn(buf)
// read first packet
dec1, err1 := conn.Read()
require.NoError(t, err1)
require.Equal(t, &base.InterleavedFrame{
Channel: 6,
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}, dec1)
p1 := &dec1.(*base.InterleavedFrame).Payload
// read second packet
dec2, err2 := conn.Read()
require.NoError(t, err2)
require.Equal(t, &base.InterleavedFrame{
Channel: 6,
Payload: []byte{0x02, 0x03, 0x04},
}, dec2)
p2 := &dec2.(*base.InterleavedFrame).Payload
// read third packet
dec3, err3 := conn.Read()
require.NoError(t, err3)
require.Equal(t, &base.InterleavedFrame{
Channel: 6,
Payload: []byte{0x03, 0x04, 0x05, 0x06, 0x07},
}, dec3)
p3 := &dec3.(*base.InterleavedFrame).Payload
// assert that the buffer was reused
require.Equal(t, p1, p2)
require.Equal(t, p2, p3)
}

func TestReadError(t *testing.T) {
var buf bytes.Buffer
conn := NewConn(&buf)
Expand Down
Loading