Skip to content

Commit

Permalink
Assign relative offsets in compressed message sets
Browse files Browse the repository at this point in the history
As previously noted (IBM#903), Sarama currently doesn't assign inner
offsets in compressed messages. This means that the broker has to assign
relative offsets and then do recompression.

Which is fine! But if we assign offsets in the producer, then the broker can
skip recompression, improving broker CPU efficiency.

One way to see the effect is by adding some debug logging to Kafka with
the following patch:

```
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 224a79277..4ecb98687 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -20,6 +20,7 @@ import java.nio.ByteBuffer

 import kafka.common.LongRef
 import kafka.message.{CompressionCodec, InvalidMessageException, NoCompressionCodec}
+import org.apache.log4j.Logger
 import org.apache.kafka.common.errors.InvalidTimestampException
 import org.apache.kafka.common.record._

@@ -27,6 +28,7 @@ import scala.collection.mutable
 import scala.collection.JavaConverters._

 private[kafka] object LogValidator {
+  private val log = Logger.getLogger(getClass())

   /**
    * Update the offsets for this message set and do further validation on messages including:
@@ -172,8 +174,13 @@ private[kafka] object LogValidator {
         // Validate the timestamp
         validateTimestamp(record, now, messageTimestampType, messageTimestampDiffMaxMs)
         // Check if we need to overwrite offset, no in place assignment situation 3
-        if (logEntry.offset != expectedInnerOffset.getAndIncrement())
+        val incrementedInnerOffset = expectedInnerOffset.getAndIncrement()
+        if (logEntry.offset != incrementedInnerOffset) {
+          log.info("overwriting inner offset; got " + logEntry.offset + " expected " + incrementedInnerOffset)
           inPlaceAssignment = false
+        } else {
+          log.info("no need to overwrite offset; got " + logEntry.offset + " expected " + incrementedInnerOffset)
+        }
         if (record.timestamp > maxTimestamp)
           maxTimestamp = record.timestamp
       }
```

This produces the following broker logs:

Before:
```
[2017-12-13 13:30:04,667] INFO no need to overwrite offset; got 0 expected 0 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 1 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 2 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 3 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 4 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 5 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 6 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 7 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 8 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 9 (kafka.log.LogValidator$)
```

After:
```
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 0 expected 0 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 1 expected 1 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 2 expected 2 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 3 expected 3 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 4 expected 4 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 5 expected 5 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 6 expected 6 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 7 expected 7 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 8 expected 8 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 9 expected 9 (kafka.log.LogValidator$)
```
  • Loading branch information
emfree committed Dec 13, 2017
1 parent 6a8d89d commit a9359fa
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 1 deletion.
5 changes: 5 additions & 0 deletions produce_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
// and sent as the payload of a single fake "message" with the appropriate codec
// set and no key. When the server sees a message with a compression codec, it
// decompresses the payload and treats the result as its message set.
for i, msg := range set.recordsToSend.msgSet.Messages {
// Assign relative offsets to the inner messages. This lets
// the broker avoid recompressing the message set.
msg.Offset = int64(i)
}
payload, err := encode(set.recordsToSend.msgSet, ps.parent.conf.MetricRegistry)
if err != nil {
Logger.Println(err) // if this happens, it's basically our fault.
Expand Down
5 changes: 4 additions & 1 deletion produce_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,14 @@ func TestProduceSetCompressedRequestBuilding(t *testing.T) {
if err != nil {
t.Error("Failed to decode set from payload")
}
for _, compMsgBlock := range msg.Set.Messages {
for i, compMsgBlock := range msg.Set.Messages {
compMsg := compMsgBlock.Msg
if compMsg.Version != 1 {
t.Error("Wrong compressed message version")
}
if compMsgBlock.Offset != int64(i) {
t.Error("Wrong relative inner offset")
}
}
if msg.Version != 1 {
t.Error("Wrong compressed parent message version")
Expand Down

0 comments on commit a9359fa

Please sign in to comment.