From bd35ef5643cb7318ccefb62050e2be84c04251b6 Mon Sep 17 00:00:00 2001 From: "qiheng.zhou" Date: Wed, 28 Aug 2024 10:09:02 +0800 Subject: [PATCH] fix: remove netpoll check --- pkg/remote/codec/default_codec.go | 26 ++++++++---------------- pkg/remote/codec/validate.go | 2 +- pkg/remote/default_bytebuf.go | 1 + pkg/remote/trans/netpoll/bytebuf.go | 6 ------ pkg/remote/trans/netpoll/bytebuf_test.go | 8 -------- 5 files changed, 10 insertions(+), 33 deletions(-) diff --git a/pkg/remote/codec/default_codec.go b/pkg/remote/codec/default_codec.go index 6e9442e543..1ae4c14d47 100644 --- a/pkg/remote/codec/default_codec.go +++ b/pkg/remote/codec/default_codec.go @@ -113,7 +113,6 @@ type defaultCodec struct { // Only effective when transport is TTHeader. // Payload is all the data after TTHeader. crc32Check bool - // TODO: allow multiple validators? // payloadValidator prepares a value based on payload in sender-side and validates the value in receiver-side. // It can only be used when ttheader is enabled. payloadValidator PayloadValidator @@ -154,6 +153,8 @@ func (c *defaultCodec) EncodePayload(ctx context.Context, message remote.Message return perrors.NewProtocolErrorWithMsg("no buffer allocated for the framed length field") } payloadLen = out.MallocLen() - headerLen + // Be careful here. The `frameLenField` was malloced before encoding payload + // If the `out` buffer using copy to grow when the capacity is not enough, setting the pre-allocated `frameLenField` will be invalid. binary.BigEndian.PutUint32(framedLenField, uint32(payloadLen)) } else if message.ProtocolInfo().CodecType == serviceinfo.Protobuf { return perrors.NewProtocolErrorWithMsg("protobuf just support 'framed' trans proto") @@ -282,15 +283,11 @@ func (c *defaultCodec) Name() string { // encodeMetaAndPayloadWithPayloadValidator encodes payload and meta with checksum of the payload. func (c *defaultCodec) encodeMetaAndPayloadWithPayloadValidator(ctx context.Context, message remote.Message, out remote.ByteBuffer, me remote.MetaEncoder) (err error) { - var ( - payloadOut = netpolltrans.NewWriterByteBuffer(netpoll.NewLinkBuffer()) - needRelease = true - ) + payloadOut := netpolltrans.NewWriterByteBuffer(netpoll.NewLinkBuffer()) defer func() { - if needRelease { - payloadOut.Release(err) - } + payloadOut.Release(err) }() + // 1. encode payload and calculate value via payload validator if err = me.EncodePayload(ctx, message, payloadOut); err != nil { return err @@ -316,17 +313,10 @@ func (c *defaultCodec) encodeMetaAndPayloadWithPayloadValidator(ctx context.Cont } // 3. write payload to the buffer after TTHeader - if netpolltrans.IsNetpollByteBuffer(out) { - // append buffer only if the input buffer is a netpollByteBuffer - // release will be executed in AppendBuffer, and thus set needRelease to false - err = out.AppendBuffer(payloadOut) - needRelease = false + if ncWriter, ok := out.(remote.NocopyWrite); ok { + err = ncWriter.WriteDirect(payload, 0) } else { - if ncWriter, ok := out.(remote.NocopyWrite); ok { - err = ncWriter.WriteDirect(payload, 0) - } else { - _, err = out.WriteBinary(payload) - } + _, err = out.WriteBinary(payload) } return err } diff --git a/pkg/remote/codec/validate.go b/pkg/remote/codec/validate.go index 306b9c1e1d..e63ca88d58 100644 --- a/pkg/remote/codec/validate.go +++ b/pkg/remote/codec/validate.go @@ -50,6 +50,7 @@ type PayloadValidator interface { // Validate validates the input payload with the attached checksum. // Return pass if validation succeed, or return error. + // DO NOT modify the input payload since it might be obtained by nocopy API from the underlying buffer. Validate(ctx context.Context, expectedValue string, inboundPayload []byte) (pass bool, err error) } @@ -175,7 +176,6 @@ type crcPayloadValidator struct{} var _ PayloadValidator = &crcPayloadValidator{} -// TODO: 2d slice func (p *crcPayloadValidator) Key(ctx context.Context) string { return transmeta.HeaderCRC32C } diff --git a/pkg/remote/default_bytebuf.go b/pkg/remote/default_bytebuf.go index 7ff1d5e191..fe7b3f3a94 100644 --- a/pkg/remote/default_bytebuf.go +++ b/pkg/remote/default_bytebuf.go @@ -38,6 +38,7 @@ func init() { } // NewWriterBuffer is used to create a defaultByteBuffer using the given size. +// NOTICE: defaultByteBuffer is only used for testing. func NewWriterBuffer(size int) ByteBuffer { return newWriterByteBuffer(size) } diff --git a/pkg/remote/trans/netpoll/bytebuf.go b/pkg/remote/trans/netpoll/bytebuf.go index 1eba85dea0..d2eee820c5 100644 --- a/pkg/remote/trans/netpoll/bytebuf.go +++ b/pkg/remote/trans/netpoll/bytebuf.go @@ -57,12 +57,6 @@ func NewReaderWriterByteBuffer(rw netpoll.ReadWriter) remote.ByteBuffer { return bytebuf } -// IsNetpollByteBuffer checks if b is a netpollByteBuffer -func IsNetpollByteBuffer(b remote.ByteBuffer) bool { - _, ok := b.(*netpollByteBuffer) - return ok -} - func newNetpollByteBuffer() interface{} { return &netpollByteBuffer{} } diff --git a/pkg/remote/trans/netpoll/bytebuf_test.go b/pkg/remote/trans/netpoll/bytebuf_test.go index 108b297f6e..9e624f3d06 100644 --- a/pkg/remote/trans/netpoll/bytebuf_test.go +++ b/pkg/remote/trans/netpoll/bytebuf_test.go @@ -204,14 +204,6 @@ func checkUnreadable(t *testing.T, buf remote.ByteBuffer) { test.Assert(t, err != nil) } -func TestIsNetpollByteBuffer(t *testing.T) { - defaultBuff := remote.NewReaderWriterBuffer(0) - test.Assert(t, IsNetpollByteBuffer(defaultBuff) == false) - - netpollBuff := NewReaderWriterByteBuffer(netpoll.NewLinkBuffer()) - test.Assert(t, IsNetpollByteBuffer(netpollBuff) == true) -} - func TestBytes(t *testing.T) { netpollBuff := NewReaderWriterByteBuffer(netpoll.NewLinkBuffer()) b := "testBytes"