From f0d0b0f8b66286614b0ddc1cbe5291f99ac979fd Mon Sep 17 00:00:00 2001 From: Eben Freeman Date: Thu, 14 Dec 2017 12:32:44 -0800 Subject: [PATCH] Restrict inner offset assignment to versions 0.10+ --- produce_set.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/produce_set.go b/produce_set.go index 7bbdd90cb..61eb3f018 100644 --- a/produce_set.go +++ b/produce_set.go @@ -132,10 +132,16 @@ func (ps *produceSet) buildRequest() *ProduceRequest { // and sent as the payload of a single fake "message" with the appropriate codec // set and no key. When the server sees a message with a compression codec, it // decompresses the payload and treats the result as its message set. - for i, msg := range set.recordsToSend.msgSet.Messages { - // Assign relative offsets to the inner messages. This lets - // the broker avoid recompressing the message set. - msg.Offset = int64(i) + + if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) { + // If our version is 0.10 or later, assign relative offsets + // to the inner messages. This lets the broker avoid + // recompressing the message set. + // (See https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Move+to+relative+offsets+in+compressed+message+sets + // for details on relative offsets.) + for i, msg := range set.recordsToSend.msgSet.Messages { + msg.Offset = int64(i) + } } payload, err := encode(set.recordsToSend.msgSet, ps.parent.conf.MetricRegistry) if err != nil {