From 682468dcc75130e0813107ef154784a8b9e17c09 Mon Sep 17 00:00:00 2001 From: Vlad Hanciuta Date: Fri, 10 Aug 2018 17:21:03 +0100 Subject: [PATCH] Fix partial messages handling Kafka 2.0 introduced chunked message down conversions which might produce partial messages at the end of the set. These partial messages are not well formed beyond offset and length, so they might cause strange decoding errors down the line. This fix makes `lengthField` a `dynamicPushDecoder` so it checks that the `packetDecoder` has at least that many bytes. It fixes the problem above and I think it's a more robust check in general. --- fetch_response_test.go | 76 ++++++++++++++++++++++++++++++++++++++++++ length_field.go | 15 ++++++++- 2 files changed, 90 insertions(+), 1 deletion(-) diff --git a/fetch_response_test.go b/fetch_response_test.go index c6b6b46e4..9f4113015 100644 --- a/fetch_response_test.go +++ b/fetch_response_test.go @@ -27,6 +27,29 @@ var ( 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE} + overflowMessageFetchResponse = []byte{ + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x05, 't', 'o', 'p', 'i', 'c', + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x05, + 0x00, 0x01, + 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, + 0x00, 0x00, 0x00, 0x30, + // messageSet + 0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x10, + // message + 0x23, 0x96, 0x4a, 0xf7, // CRC + 0x00, + 0x00, + 0xFF, 0xFF, 0xFF, 0xFF, + 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE, + // overflow messageSet + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0x00, 0x00, 0x00, 0xFF, + // overflow bytes + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} + oneRecordFetchResponse = []byte{ 0x00, 0x00, 0x00, 0x00, // ThrottleTime 0x00, 0x00, 0x00, 0x01, // Number of Topics @@ -148,6 +171,59 @@ func TestOneMessageFetchResponse(t *testing.T) { } } +func TestOverflowMessageFetchResponse(t *testing.T) { + response := FetchResponse{} + testVersionDecodable(t, "overflow message", &response, overflowMessageFetchResponse, 0) + + if len(response.Blocks) != 1 { + t.Fatal("Decoding produced incorrect number of topic blocks.") + } + + if len(response.Blocks["topic"]) != 1 { + t.Fatal("Decoding produced incorrect number of partition blocks for topic.") + } + + block := response.GetBlock("topic", 5) + if block == nil { + t.Fatal("GetBlock didn't return block.") + } + if block.Err != ErrOffsetOutOfRange { + t.Error("Decoding didn't produce correct error code.") + } + if block.HighWaterMarkOffset != 0x10101010 { + t.Error("Decoding didn't produce correct high water mark offset.") + } + partial, err := block.Records.isPartial() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !partial { + t.Error("Overflow messages should be partial.") + } + + n, err := block.Records.numRecords() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if n != 1 { + t.Fatal("Decoding produced incorrect number of messages.") + } + msgBlock := block.Records.msgSet.Messages[0] + if msgBlock.Offset != 0x550000 { + t.Error("Decoding produced incorrect message offset.") + } + msg := msgBlock.Msg + if msg.Codec != CompressionNone { + t.Error("Decoding produced incorrect message compression.") + } + if msg.Key != nil { + t.Error("Decoding produced message key where there was none.") + } + if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) { + t.Error("Decoding produced incorrect message value.") + } +} + func TestOneRecordFetchResponse(t *testing.T) { response := FetchResponse{} testVersionDecodable(t, "one record", &response, oneRecordFetchResponse, 4) diff --git a/length_field.go b/length_field.go index 576b1a6f6..da199a70a 100644 --- a/length_field.go +++ b/length_field.go @@ -5,6 +5,19 @@ import "encoding/binary" // LengthField implements the PushEncoder and PushDecoder interfaces for calculating 4-byte lengths. type lengthField struct { startOffset int + length int32 +} + +func (l *lengthField) decode(pd packetDecoder) error { + var err error + l.length, err = pd.getInt32() + if err != nil { + return err + } + if l.length > int32(pd.remaining()) { + return ErrInsufficientData + } + return nil } func (l *lengthField) saveOffset(in int) { @@ -21,7 +34,7 @@ func (l *lengthField) run(curOffset int, buf []byte) error { } func (l *lengthField) check(curOffset int, buf []byte) error { - if uint32(curOffset-l.startOffset-4) != binary.BigEndian.Uint32(buf[l.startOffset:]) { + if int32(curOffset-l.startOffset-4) != l.length { return PacketDecodingError{"length field invalid"} }