Skip to content

Commit

Permalink
fix: revert netpoll bytebuffer Bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
ppzqh committed Aug 28, 2024
1 parent ef4c99e commit 7781f96
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 31 deletions.
13 changes: 11 additions & 2 deletions pkg/remote/codec/default_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ func (c *defaultCodec) Name() string {

// encodeMetaAndPayloadWithPayloadValidator encodes payload and meta with checksum of the payload.
func (c *defaultCodec) encodeMetaAndPayloadWithPayloadValidator(ctx context.Context, message remote.Message, out remote.ByteBuffer, me remote.MetaEncoder) (err error) {
payloadOut := netpolltrans.NewWriterByteBuffer(netpoll.NewLinkBuffer())
writer := netpoll.NewLinkBuffer()
payloadOut := netpolltrans.NewWriterByteBuffer(writer)
defer func() {
payloadOut.Release(err)
}()
Expand All @@ -279,7 +280,7 @@ func (c *defaultCodec) encodeMetaAndPayloadWithPayloadValidator(ctx context.Cont
}
// get the payload from buffer
// use copy api here because the payload will be used as an argument of Generate function in validator
payload, err := payloadOut.Bytes()
payload, err := getWrittenBytes(writer)
if err != nil {
return err
}
Expand Down Expand Up @@ -458,3 +459,11 @@ func checkPayloadSize(payloadLen, maxSize int) error {
}
return nil
}

// getWrittenBytes gets all written bytes from linkbuffer.
func getWrittenBytes(lb *netpoll.LinkBuffer) (buf []byte, err error) {
if err = lb.Flush(); err != nil {
return nil, err
}
return lb.Bytes(), nil
}
12 changes: 7 additions & 5 deletions pkg/remote/codec/default_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,13 +282,14 @@ func TestDefaultCodecWithCRC32_Encode_Decode(t *testing.T) {
test.Assert(t, err != nil)

// encode with netpollBytebuffer
npBuffer := netpolltrans.NewReaderWriterByteBuffer(netpoll.NewLinkBuffer())
writer := netpoll.NewLinkBuffer()
npBuffer := netpolltrans.NewReaderWriterByteBuffer(writer)
err = dc.Encode(ctx, sendMsg, npBuffer)
test.Assert(t, err == nil, err)

// decode, succeed
recvMsg := initServerRecvMsg()
buf, err := npBuffer.Bytes()
buf, err := getWrittenBytes(writer)
test.Assert(t, err == nil, err)
in := remote.NewReaderBuffer(buf)
err = dc.Decode(ctx, recvMsg, in)
Expand Down Expand Up @@ -337,13 +338,14 @@ func TestDefaultCodecWithCustomizedValidator(t *testing.T) {
test.Assert(t, strings.Contains(err.Error(), "limit"), err)

// encode with netpollBytebuffer
npBuffer = netpolltrans.NewReaderWriterByteBuffer(netpoll.NewLinkBuffer())
writer := netpoll.NewLinkBuffer()
npBuffer = netpolltrans.NewReaderWriterByteBuffer(writer)
err = dc.Encode(ctx, sendMsg, npBuffer)
test.Assert(t, err == nil, err)

// decode, succeed
recvMsg := initServerRecvMsg()
buf, err := npBuffer.Bytes()
buf, err := getWrittenBytes(writer)
test.Assert(t, err == nil, err)
in := remote.NewReaderBuffer(buf)
err = dc.Decode(ctx, recvMsg, in)
Expand Down Expand Up @@ -417,7 +419,7 @@ func BenchmarkDefaultEncodeDecode(b *testing.B) {
codec := f()
sendMsg := initClientSendMsg(transport.TTHeader, msgLen)
// encode
out := remote.NewWriterBuffer(1024)
out := netpolltrans.NewWriterByteBuffer(netpoll.NewLinkBuffer())
err := codec.Encode(ctx, sendMsg, out)
test.Assert(b, err == nil, err)

Expand Down
17 changes: 12 additions & 5 deletions pkg/remote/trans/netpoll/bytebuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,12 @@ func (b *netpollByteBuffer) AppendBuffer(buf remote.ByteBuffer) (err error) {
return
}

// Bytes gets all written bytes.
// Bytes are not supported in netpoll bytebuf.
func (b *netpollByteBuffer) Bytes() (buf []byte, err error) {
lb := b.writer.(*netpoll.LinkBuffer)
if err = lb.Flush(); err != nil {
return nil, err
if b.reader != nil {
return b.reader.Peek(b.reader.Len())
}
return lb.Bytes(), nil
return nil, errors.New("method Bytes() not support in netpoll bytebuf")
}

// Release will free the buffer already read.
Expand All @@ -250,3 +249,11 @@ func (b *netpollByteBuffer) zero() {
b.status = 0
b.readSize = 0
}

// GetWrittenBytes gets all written bytes from linkbuffer.
func GetWrittenBytes(lb *netpoll.LinkBuffer) (buf []byte, err error) {
if err = lb.Flush(); err != nil {
return nil, err
}
return lb.Bytes(), nil
}
19 changes: 0 additions & 19 deletions pkg/remote/trans/netpoll/bytebuf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,22 +203,3 @@ func checkUnreadable(t *testing.T, buf remote.ByteBuffer) {
_, err = buf.Read(nil)
test.Assert(t, err != nil)
}

func TestBytes(t *testing.T) {
netpollBuff := NewReaderWriterByteBuffer(netpoll.NewLinkBuffer())
b := "testBytes"
n, err := netpollBuff.WriteString(b)
test.Assert(t, n == len(b))
test.Assert(t, err == nil)

// Bytes
b1, err := netpollBuff.Bytes()
test.Assert(t, err == nil)
test.Assert(t, string(b1) == b)
}

func TestRelease(t *testing.T) {
buf := NewReaderWriterByteBuffer(netpoll.NewLinkBuffer())
buf.Release(nil)
buf.Release(nil)
}

0 comments on commit 7781f96

Please sign in to comment.