Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix partial messages handling #1149

Merged
merged 4 commits into from
Aug 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ env:
- KAFKA_HOSTNAME=localhost
- DEBUG=true
matrix:
- KAFKA_VERSION=0.11.0.2
- KAFKA_VERSION=1.0.0
- KAFKA_VERSION=1.1.0
- KAFKA_VERSION=2.0.0

before_install:
- export REPOSITORY_ROOT=${TRAVIS_BUILD_DIR}
Expand Down
2 changes: 1 addition & 1 deletion dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: sarama

up:
- go:
version: '1.9'
version: '1.10'

commands:
test:
Expand Down
23 changes: 17 additions & 6 deletions fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,26 @@ func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error)
return err
}

// If we have at least one full records, we skip incomplete ones
if partial && len(b.RecordsSet) > 0 {
break
n, err := records.numRecords()
if err != nil {
return err
}

b.RecordsSet = append(b.RecordsSet, records)
if n > 0 || (partial && len(b.RecordsSet) == 0) {
b.RecordsSet = append(b.RecordsSet, records)

if b.Records == nil {
b.Records = records
}
}

if b.Records == nil {
b.Records = records
overflow, err := records.isOverflow()
if err != nil {
return err
}

if partial || overflow {
break
}
}

Expand Down
83 changes: 83 additions & 0 deletions fetch_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,29 @@ var (
0xFF, 0xFF, 0xFF, 0xFF,
0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}

overflowMessageFetchResponse = []byte{
0x00, 0x00, 0x00, 0x01,
0x00, 0x05, 't', 'o', 'p', 'i', 'c',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x05,
0x00, 0x01,
0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10,
0x00, 0x00, 0x00, 0x30,
// messageSet
0x00, 0x00, 0x00, 0x00, 0x00, 0x55, 0x00, 0x00,
0x00, 0x00, 0x00, 0x10,
// message
0x23, 0x96, 0x4a, 0xf7, // CRC
0x00,
0x00,
0xFF, 0xFF, 0xFF, 0xFF,
0x00, 0x00, 0x00, 0x02, 0x00, 0xEE,
// overflow messageSet
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
0x00, 0x00, 0x00, 0xFF,
// overflow bytes
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}

oneRecordFetchResponse = []byte{
0x00, 0x00, 0x00, 0x00, // ThrottleTime
0x00, 0x00, 0x00, 0x01, // Number of Topics
Expand Down Expand Up @@ -148,6 +171,66 @@ func TestOneMessageFetchResponse(t *testing.T) {
}
}

func TestOverflowMessageFetchResponse(t *testing.T) {
response := FetchResponse{}
testVersionDecodable(t, "overflow message", &response, overflowMessageFetchResponse, 0)

if len(response.Blocks) != 1 {
t.Fatal("Decoding produced incorrect number of topic blocks.")
}

if len(response.Blocks["topic"]) != 1 {
t.Fatal("Decoding produced incorrect number of partition blocks for topic.")
}

block := response.GetBlock("topic", 5)
if block == nil {
t.Fatal("GetBlock didn't return block.")
}
if block.Err != ErrOffsetOutOfRange {
t.Error("Decoding didn't produce correct error code.")
}
if block.HighWaterMarkOffset != 0x10101010 {
t.Error("Decoding didn't produce correct high water mark offset.")
}
partial, err := block.Records.isPartial()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if partial {
t.Error("Decoding detected a partial trailing message where there wasn't one.")
}
overflow, err := block.Records.isOverflow()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !overflow {
t.Error("Decoding detected a partial trailing message where there wasn't one.")
}

n, err := block.Records.numRecords()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if n != 1 {
t.Fatal("Decoding produced incorrect number of messages.")
}
msgBlock := block.Records.MsgSet.Messages[0]
if msgBlock.Offset != 0x550000 {
t.Error("Decoding produced incorrect message offset.")
}
msg := msgBlock.Msg
if msg.Codec != CompressionNone {
t.Error("Decoding produced incorrect message compression.")
}
if msg.Key != nil {
t.Error("Decoding produced message key where there was none.")
}
if !bytes.Equal(msg.Value, []byte{0x00, 0xEE}) {
t.Error("Decoding produced incorrect message value.")
}
}

func TestOneRecordFetchResponse(t *testing.T) {
response := FetchResponse{}
testVersionDecodable(t, "one record", &response, oneRecordFetchResponse, 4)
Expand Down
15 changes: 14 additions & 1 deletion length_field.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,19 @@ import "encoding/binary"
// LengthField implements the PushEncoder and PushDecoder interfaces for calculating 4-byte lengths.
type lengthField struct {
startOffset int
length int32
}

func (l *lengthField) decode(pd packetDecoder) error {
var err error
l.length, err = pd.getInt32()
if err != nil {
return err
}
if l.length > int32(pd.remaining()) {
return ErrInsufficientData
}
return nil
}

func (l *lengthField) saveOffset(in int) {
Expand All @@ -21,7 +34,7 @@ func (l *lengthField) run(curOffset int, buf []byte) error {
}

func (l *lengthField) check(curOffset int, buf []byte) error {
if uint32(curOffset-l.startOffset-4) != binary.BigEndian.Uint32(buf[l.startOffset:]) {
if int32(curOffset-l.startOffset-4) != l.length {
return PacketDecodingError{"length field invalid"}
}

Expand Down
8 changes: 7 additions & 1 deletion message_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (msb *MessageBlock) decode(pd packetDecoder) (err error) {

type MessageSet struct {
PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock
OverflowMessage bool // whether the set on the wire contained an overflow message
Messages []*MessageBlock
}

Expand Down Expand Up @@ -85,7 +86,12 @@ func (ms *MessageSet) decode(pd packetDecoder) (err error) {
case ErrInsufficientData:
// As an optimization the server is allowed to return a partial message at the
// end of the message set. Clients should handle this case. So we just ignore such things.
ms.PartialTrailingMessage = true
if msb.Offset == -1 {
// This is an overflow message caused by chunked down conversion
ms.OverflowMessage = true
} else {
ms.PartialTrailingMessage = true
}
return nil
default:
return err
Expand Down
21 changes: 21 additions & 0 deletions records.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,27 @@ func (r *Records) isControl() (bool, error) {
return false, fmt.Errorf("unknown records type: %v", r.recordsType)
}

func (r *Records) isOverflow() (bool, error) {
if r.recordsType == unknownRecords {
if empty, err := r.setTypeFromFields(); err != nil || empty {
return false, err
}
}

switch r.recordsType {
case unknownRecords:
return false, nil
case legacyRecords:
if r.MsgSet == nil {
return false, nil
}
return r.MsgSet.OverflowMessage, nil
case defaultRecords:
return false, nil
}
return false, fmt.Errorf("unknown records type: %v", r.recordsType)
}

func magicValue(pd packetDecoder) (int8, error) {
dec, err := pd.peek(magicOffset, magicLength)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ var (
V0_11_0_2 = newKafkaVersion(0, 11, 0, 2)
V1_0_0_0 = newKafkaVersion(1, 0, 0, 0)
V1_1_0_0 = newKafkaVersion(1, 1, 0, 0)
V2_0_0_0 = newKafkaVersion(2, 0, 0, 0)

SupportedVersions = []KafkaVersion{
V0_8_2_0,
Expand All @@ -173,9 +174,10 @@ var (
V0_11_0_2,
V1_0_0_0,
V1_1_0_0,
V2_0_0_0,
}
MinVersion = V0_8_2_0
MaxVersion = V1_1_0_0
MaxVersion = V2_0_0_0
)

func ParseKafkaVersion(s string) (KafkaVersion, error) {
Expand Down