Skip to content

Commit

Permalink
fix: remove netpoll check
Browse files Browse the repository at this point in the history
  • Loading branch information
ppzqh committed Aug 28, 2024
1 parent d3caf15 commit bd35ef5
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 33 deletions.
26 changes: 8 additions & 18 deletions pkg/remote/codec/default_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ type defaultCodec struct {
// Only effective when transport is TTHeader.
// Payload is all the data after TTHeader.
crc32Check bool
// TODO: allow multiple validators?
// payloadValidator prepares a value based on payload in sender-side and validates the value in receiver-side.
// It can only be used when ttheader is enabled.
payloadValidator PayloadValidator
Expand Down Expand Up @@ -154,6 +153,8 @@ func (c *defaultCodec) EncodePayload(ctx context.Context, message remote.Message
return perrors.NewProtocolErrorWithMsg("no buffer allocated for the framed length field")
}
payloadLen = out.MallocLen() - headerLen
// Be careful here. The `frameLenField` was malloced before encoding payload
// If the `out` buffer using copy to grow when the capacity is not enough, setting the pre-allocated `frameLenField` will be invalid.
binary.BigEndian.PutUint32(framedLenField, uint32(payloadLen))
} else if message.ProtocolInfo().CodecType == serviceinfo.Protobuf {
return perrors.NewProtocolErrorWithMsg("protobuf just support 'framed' trans proto")
Expand Down Expand Up @@ -282,15 +283,11 @@ func (c *defaultCodec) Name() string {

// encodeMetaAndPayloadWithPayloadValidator encodes payload and meta with checksum of the payload.
func (c *defaultCodec) encodeMetaAndPayloadWithPayloadValidator(ctx context.Context, message remote.Message, out remote.ByteBuffer, me remote.MetaEncoder) (err error) {
var (
payloadOut = netpolltrans.NewWriterByteBuffer(netpoll.NewLinkBuffer())
needRelease = true
)
payloadOut := netpolltrans.NewWriterByteBuffer(netpoll.NewLinkBuffer())
defer func() {
if needRelease {
payloadOut.Release(err)
}
payloadOut.Release(err)
}()

// 1. encode payload and calculate value via payload validator
if err = me.EncodePayload(ctx, message, payloadOut); err != nil {
return err
Expand All @@ -316,17 +313,10 @@ func (c *defaultCodec) encodeMetaAndPayloadWithPayloadValidator(ctx context.Cont
}

// 3. write payload to the buffer after TTHeader
if netpolltrans.IsNetpollByteBuffer(out) {
// append buffer only if the input buffer is a netpollByteBuffer
// release will be executed in AppendBuffer, and thus set needRelease to false
err = out.AppendBuffer(payloadOut)
needRelease = false
if ncWriter, ok := out.(remote.NocopyWrite); ok {
err = ncWriter.WriteDirect(payload, 0)
} else {
if ncWriter, ok := out.(remote.NocopyWrite); ok {
err = ncWriter.WriteDirect(payload, 0)
} else {
_, err = out.WriteBinary(payload)
}
_, err = out.WriteBinary(payload)
}
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/remote/codec/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type PayloadValidator interface {

// Validate validates the input payload with the attached checksum.
// Return pass if validation succeed, or return error.
// DO NOT modify the input payload since it might be obtained by nocopy API from the underlying buffer.
Validate(ctx context.Context, expectedValue string, inboundPayload []byte) (pass bool, err error)
}

Expand Down Expand Up @@ -175,7 +176,6 @@ type crcPayloadValidator struct{}

var _ PayloadValidator = &crcPayloadValidator{}

// TODO: 2d slice
func (p *crcPayloadValidator) Key(ctx context.Context) string {
return transmeta.HeaderCRC32C
}
Expand Down
1 change: 1 addition & 0 deletions pkg/remote/default_bytebuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func init() {
}

// NewWriterBuffer is used to create a defaultByteBuffer using the given size.
// NOTICE: defaultByteBuffer is only used for testing.
func NewWriterBuffer(size int) ByteBuffer {
return newWriterByteBuffer(size)
}
Expand Down
6 changes: 0 additions & 6 deletions pkg/remote/trans/netpoll/bytebuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,6 @@ func NewReaderWriterByteBuffer(rw netpoll.ReadWriter) remote.ByteBuffer {
return bytebuf
}

// IsNetpollByteBuffer checks if b is a netpollByteBuffer
func IsNetpollByteBuffer(b remote.ByteBuffer) bool {
_, ok := b.(*netpollByteBuffer)
return ok
}

func newNetpollByteBuffer() interface{} {
return &netpollByteBuffer{}
}
Expand Down
8 changes: 0 additions & 8 deletions pkg/remote/trans/netpoll/bytebuf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,14 +204,6 @@ func checkUnreadable(t *testing.T, buf remote.ByteBuffer) {
test.Assert(t, err != nil)
}

func TestIsNetpollByteBuffer(t *testing.T) {
defaultBuff := remote.NewReaderWriterBuffer(0)
test.Assert(t, IsNetpollByteBuffer(defaultBuff) == false)

netpollBuff := NewReaderWriterByteBuffer(netpoll.NewLinkBuffer())
test.Assert(t, IsNetpollByteBuffer(netpollBuff) == true)
}

func TestBytes(t *testing.T) {
netpollBuff := NewReaderWriterByteBuffer(netpoll.NewLinkBuffer())
b := "testBytes"
Expand Down

0 comments on commit bd35ef5

Please sign in to comment.