Skip to content

Commit

Permalink
Fix partial messages handling
Browse files Browse the repository at this point in the history
Kafka 2.0 introduced chunked message down conversions which might
produce partial messages at the end of the set. These partial messages
are not well formed beyond offset and length, so they might cause
strange decoding errors down the line.
This fix makes `lengthField` a `dynamicPushDecoder` so it checks that
the `packetDecoder` has at least that many bytes. It fixes the problem
above and I think it's a more robust check in general.
  • Loading branch information
vlad-arista committed Aug 10, 2018
1 parent e7238b1 commit 682468d
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 1 deletion.
76 changes: 76 additions & 0 deletions fetch_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,29 @@ var (
0xFF, 0xFF, 0xFF, 0xFF,
0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}

overflowMessageFetchResponse = []byte{
0x00, 0x00, 0x00, 0x01,
0x00, 0x05, 't', 'o', 'p', 'i', 'c',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x05,
0x00, 0x01,
0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10,
0x00, 0x00, 0x00, 0x30,
// messageSet
0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0x00, 0x00,
0x00, 0x00, 0x00, 0x10,
// message
0x23, 0x96, 0x4a, 0xf7, // CRC
0x00,
0x00,
0xFF, 0xFF, 0xFF, 0xFF,
0x00, 0x00, 0x00, 0x02, 0x00, 0xEE,
// overflow messageSet
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
0x00, 0x00, 0x00, 0xFF,
// overflow bytes
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}

oneRecordFetchResponse = []byte{
0x00, 0x00, 0x00, 0x00, // ThrottleTime
0x00, 0x00, 0x00, 0x01, // Number of Topics
Expand Down Expand Up @@ -148,6 +171,59 @@ func TestOneMessageFetchResponse(t *testing.T) {
}
}

func TestOverflowMessageFetchResponse(t *testing.T) {
response := FetchResponse{}
testVersionDecodable(t, "overflow message", &response, overflowMessageFetchResponse, 0)

if len(response.Blocks) != 1 {
t.Fatal("Decoding produced incorrect number of topic blocks.")
}

if len(response.Blocks["topic"]) != 1 {
t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
}

block := response.GetBlock("topic", 5)
if block == nil {
t.Fatal("GetBlock didn't return block.")
}
if block.Err != ErrOffsetOutOfRange {
t.Error("Decoding didn't produce correct error code.")
}
if block.HighWaterMarkOffset != 0x10101010 {
t.Error("Decoding didn't produce correct high water mark offset.")
}
partial, err := block.Records.isPartial()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !partial {
t.Error("Overflow messages should be partial.")
}

n, err := block.Records.numRecords()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if n != 1 {
t.Fatal("Decoding produced incorrect number of messages.")
}
msgBlock := block.Records.msgSet.Messages[0]
if msgBlock.Offset != 0x550000 {
t.Error("Decoding produced incorrect message offset.")
}
msg := msgBlock.Msg
if msg.Codec != CompressionNone {
t.Error("Decoding produced incorrect message compression.")
}
if msg.Key != nil {
t.Error("Decoding produced message key where there was none.")
}
if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) {
t.Error("Decoding produced incorrect message value.")
}
}

func TestOneRecordFetchResponse(t *testing.T) {
response := FetchResponse{}
testVersionDecodable(t, "one record", &response, oneRecordFetchResponse, 4)
Expand Down
15 changes: 14 additions & 1 deletion length_field.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,19 @@ import "encoding/binary"
// LengthField implements the PushEncoder and PushDecoder interfaces for calculating 4-byte lengths.
type lengthField struct {
startOffset int
length int32
}

func (l *lengthField) decode(pd packetDecoder) error {
var err error
l.length, err = pd.getInt32()
if err != nil {
return err
}
if l.length > int32(pd.remaining()) {
return ErrInsufficientData
}
return nil
}

func (l *lengthField) saveOffset(in int) {
Expand All @@ -21,7 +34,7 @@ func (l *lengthField) run(curOffset int, buf []byte) error {
}

func (l *lengthField) check(curOffset int, buf []byte) error {
if uint32(curOffset-l.startOffset-4) != binary.BigEndian.Uint32(buf[l.startOffset:]) {
if int32(curOffset-l.startOffset-4) != l.length {
return PacketDecodingError{"length field invalid"}
}

Expand Down

0 comments on commit 682468d

Please sign in to comment.