Skip to content

Commit

Permalink
Consumer: handle compressed relative offsets
Browse files Browse the repository at this point in the history
New message format does something weird with these. See
https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Move+to+relative+offsets+in+compressed+message+sets

Fixes #720. Supercedes #721. Thanks to @dynamix for the first draft of the fix.
  • Loading branch information
eapache committed Aug 23, 2016
1 parent 9489511 commit 16da292
Showing 1 changed file with 9 additions and 4 deletions.
13 changes: 9 additions & 4 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,21 +488,26 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
for _, msgBlock := range block.MsgSet.Messages {

for _, msg := range msgBlock.Messages() {
if prelude && msg.Offset < child.offset {
offset := msg.Offset
if msg.Msg.Version >= 1 {
baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset
offset += baseOffset
}
if prelude && offset < child.offset {
continue
}
prelude = false

if msg.Offset >= child.offset {
if offset >= child.offset {
messages = append(messages, &ConsumerMessage{
Topic: child.topic,
Partition: child.partition,
Key: msg.Msg.Key,
Value: msg.Msg.Value,
Offset: msg.Offset,
Offset: offset,
Timestamp: msg.Msg.Timestamp,
})
child.offset = msg.Offset + 1
child.offset = offset + 1
} else {
incomplete = true
}
Expand Down

0 comments on commit 16da292

Please sign in to comment.