Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sarama library improvement prove of concept to avoid kafka broker re-compression #903

Closed
lintang0 opened this issue Jun 29, 2017 · 2 comments

Comments

@lintang0
Copy link

Versions
  • Sarama Version: f7e3024
  • Kafka Version: 0.10.1.0
  • Go Version: 1.8
Sarama Configuration
config.Version = sarama.V0_10_1_0
config.Producer.Compression = sarama.CompressionGZIP
Kafka Configuration in server.properties
 compression.type=producer
Kafka server.log
[2017-06-28 17:34:06,452] INFO BBMS : messageAndOffset.offset:expectedInnerOffset : 0:0 (kafka.message.ByteBufferMessageSet)
[2017-06-28 17:34:06,453] INFO BBMS : messageAndOffset.offset:expectedInnerOffset : 0:1 (kafka.message.ByteBufferMessageSet)
[2017-06-28 17:34:06,453] INFO BBMS : messageAndOffset.offset:expectedInnerOffset : 0:2 (kafka.message.ByteBufferMessageSet)
[2017-06-28 17:34:06,453] INFO BBMS : messageAndOffset.offset:expectedInnerOffset : 0:3 (kafka.message.ByteBufferMessageSet)
[2017-06-28 17:34:06,454] INFO BBMS : messageAndOffset.offset:expectedInnerOffset : 0:4 (kafka.message.ByteBufferMessageSet)
[2017-06-28 17:34:06,454] INFO BBMS : Recompressing Message  (kafka.message.ByteBufferMessageSet)
Problem Description

Kafka broker re-compresses messages when receiving compressed messages due to the inner offset of zero shown in the above kafka server.log. There are discussion (https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Move+to+relative+offsets+in+compressed+message+sets) regarding the inner offset.

Here is a proof of concept in sarama message_set.go to demonstrate the improvement of broker without re-compression. Broker's CPU usage reduced 30% in the test environment.

diff --git a/message_set.go b/message_set.go

@@ -51,7 +51,16 @@ type MessageSet struct {
 }

 func (ms *MessageSet) encode(pe packetEncoder) error {
+
+       var correction int
+       if len(ms.Messages) == 1 && ms.Messages[0].Msg.Set != nil {
+               correction = len(ms.Messages[0].Msg.Set.Messages)
+       } else {
+               correction = len(ms.Messages)
+       }
+
    for i := range ms.Messages {
+           ms.Messages[i].Offset = int64(i + correction - 1)
            err := ms.Messages[i].encode(pe)
            if err != nil {
                    return err

With the above update, broker doesn't need to re-compress the messages since the inner offset set to the 0, 1, 2... by sarama library, leading to 30% lower cpu usage in the test setup. here is the broker log with 5 messages produced.

[2017-06-28 17:34:33,989] INFO BBMS : messageAndOffset.offset:expectedInnerOffset : 0:0 (kafka.message.ByteBufferMessageSet)
[2017-06-28 17:34:33,989] INFO BBMS : messageAndOffset.offset:expectedInnerOffset : 1:1 (kafka.message.ByteBufferMessageSet)
[2017-06-28 17:34:33,989] INFO BBMS : messageAndOffset.offset:expectedInnerOffset : 2:2 (kafka.message.ByteBufferMessageSet)
[2017-06-28 17:34:33,990] INFO BBMS : messageAndOffset.offset:expectedInnerOffset : 3:3 (kafka.message.ByteBufferMessageSet)
[2017-06-28 17:34:33,990] INFO BBMS : messageAndOffset.offset:expectedInnerOffset : 4:4 (kafka.message.ByteBufferMessageSet)
[2017-06-28 17:34:33,990] INFO BBMS : Original Message Retained :  (kafka.message.ByteBufferMessageSet)

The following patch was used to produce the debug log based on the source code https://github.com/apache/kafka/archive/0.10.1.0.tar.gz

@@ -448,14 +448,17 @@
       // Validate the timestamp
       validateTimestamp(message, now, messageTimestampType, messageTimestampDiffMaxMs)
       // Check if we need to overwrite offset
-          if (messageAndOffset.offset != expectedInnerOffset.getAndIncrement()) 
+          var innerOffsetIncrement = expectedInnerOffset.getAndIncrement() 
+          if (messageAndOffset.offset != innerOffsetIncrement)             inPlaceAssignment = false           if (message.timestamp > maxTimestamp) {             maxTimestamp = message.timestamp             offsetOfMaxTimestamp = offsetCounter.value + expectedInnerOffset.value - 1           } 
+          info("BBMS : messageAndOffset.offset:expectedInnerOffset : " + messageAndOffset.offset + ":" + innerOffsetIncrement)         } 

if (sourceCodec != NoCompressionCodec && message.compressionCodec != NoCompressionCodec)           throw new InvalidMessageException("Compressed outer message should not have an inner message with a " +             s"compression attribute set: $message") 

@@ -478,6 +481,7 @@
         (Some(now), {if (targetCodec == NoCompressionCodec) offsetCounter.value else offsetCounter.value + validatedMessages.length - 1})         } 
+        info("BBMS : Recompressing Message ")         ValidationAndOffsetAssignResult(validatedMessages = new ByteBufferMessageSet(compressionCodec = targetCodec,                                                                                      offsetCounter = offsetCounter,                                                                                      wrapperMessageTimestamp = largestTimestampOfMessageSet, 

@@ -487,6 +491,7 @@
                                     offsetOfMaxTimestamp = offsetOfMaxTimestampInMessageSet,                                         messageSizeMaybeChanged = true)       } else { 
+          info("BBMS : Original Message Retained : " )
     // Do not do re-compression but simply update the offset, timestamp and attributes field of the wrapper message.         buffer.putLong(0, offsetCounter.addAndGet(validatedMessages.size) - 1)
     // validate the messages
@eapache
Copy link
Contributor

eapache commented Jun 29, 2017

Makes sense to me, I'd be happy to consider this as a Pull Request.

My only concern would be if we need to do something to maintain backwards-compatibility for older brokers which don't know how to deal with relative inner offsets.

@eapache
Copy link
Contributor

eapache commented Aug 31, 2017

Closing as stale. Please re-open as a pull request if you are still interested.

@eapache eapache closed this as completed Aug 31, 2017
emfree added a commit to emfree/sarama that referenced this issue Dec 13, 2017
As previously noted (IBM#903), Sarama currently doesn't assign inner
offsets in compressed messages. This means that the broker has to assign
relative offsets and then do recompression.

This patch sets relative offsets, which means that brokers don't have to
recompress, and can use less CPU.

One way to see the effect is with a rough Kafka patch:

```
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 224a79277..4ecb98687 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -20,6 +20,7 @@ import java.nio.ByteBuffer

 import kafka.common.LongRef
 import kafka.message.{CompressionCodec, InvalidMessageException, NoCompressionCodec}
+import org.apache.log4j.Logger
 import org.apache.kafka.common.errors.InvalidTimestampException
 import org.apache.kafka.common.record._

@@ -27,6 +28,7 @@ import scala.collection.mutable
 import scala.collection.JavaConverters._

 private[kafka] object LogValidator {
+  private val log = Logger.getLogger(getClass())

   /**
    * Update the offsets for this message set and do further validation on messages including:
@@ -172,8 +174,13 @@ private[kafka] object LogValidator {
         // Validate the timestamp
         validateTimestamp(record, now, messageTimestampType, messageTimestampDiffMaxMs)
         // Check if we need to overwrite offset, no in place assignment situation 3
-        if (logEntry.offset != expectedInnerOffset.getAndIncrement())
+        val incrementedInnerOffset = expectedInnerOffset.getAndIncrement()
+        if (logEntry.offset != incrementedInnerOffset) {
+          log.info("overwriting inner offset; got " + logEntry.offset + " expected " + incrementedInnerOffset)
           inPlaceAssignment = false
+        } else {
+          log.info("no need to overwrite offset; got " + logEntry.offset + " expected " + incrementedInnerOffset)
+        }
         if (record.timestamp > maxTimestamp)
           maxTimestamp = record.timestamp
       }
```

This produces the following broker logs:

Before:
```
[2017-12-13 13:30:04,667] INFO no need to overwrite offset; got 0 expected 0 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 1 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 2 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 3 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 4 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 5 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 6 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 7 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 8 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 9 (kafka.log.LogValidator$)
```

After:
```
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 0 expected 0 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 1 expected 1 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 2 expected 2 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 3 expected 3 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 4 expected 4 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 5 expected 5 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 6 expected 6 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 7 expected 7 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 8 expected 8 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 9 expected 9 (kafka.log.LogValidator$)
```
emfree added a commit to emfree/sarama that referenced this issue Dec 13, 2017
As previously noted (IBM#903), Sarama currently doesn't assign inner
offsets in compressed messages. This means that the broker has to assign
relative offsets and then do recompression.

Which is fine! But if we assign offsets in the producer, then the broker can
skip recompression, improving broker CPU efficiency.

One way to see the effect is by adding some debug logging to Kafka with
the following patch:

```
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 224a79277..4ecb98687 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -20,6 +20,7 @@ import java.nio.ByteBuffer

 import kafka.common.LongRef
 import kafka.message.{CompressionCodec, InvalidMessageException, NoCompressionCodec}
+import org.apache.log4j.Logger
 import org.apache.kafka.common.errors.InvalidTimestampException
 import org.apache.kafka.common.record._

@@ -27,6 +28,7 @@ import scala.collection.mutable
 import scala.collection.JavaConverters._

 private[kafka] object LogValidator {
+  private val log = Logger.getLogger(getClass())

   /**
    * Update the offsets for this message set and do further validation on messages including:
@@ -172,8 +174,13 @@ private[kafka] object LogValidator {
         // Validate the timestamp
         validateTimestamp(record, now, messageTimestampType, messageTimestampDiffMaxMs)
         // Check if we need to overwrite offset, no in place assignment situation 3
-        if (logEntry.offset != expectedInnerOffset.getAndIncrement())
+        val incrementedInnerOffset = expectedInnerOffset.getAndIncrement()
+        if (logEntry.offset != incrementedInnerOffset) {
+          log.info("overwriting inner offset; got " + logEntry.offset + " expected " + incrementedInnerOffset)
           inPlaceAssignment = false
+        } else {
+          log.info("no need to overwrite offset; got " + logEntry.offset + " expected " + incrementedInnerOffset)
+        }
         if (record.timestamp > maxTimestamp)
           maxTimestamp = record.timestamp
       }
```

This produces the following broker logs:

Before:
```
[2017-12-13 13:30:04,667] INFO no need to overwrite offset; got 0 expected 0 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 1 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 2 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 3 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 4 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 5 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 6 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 7 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 8 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 9 (kafka.log.LogValidator$)
```

After:
```
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 0 expected 0 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 1 expected 1 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 2 expected 2 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 3 expected 3 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 4 expected 4 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 5 expected 5 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 6 expected 6 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 7 expected 7 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 8 expected 8 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 9 expected 9 (kafka.log.LogValidator$)
```
emfree added a commit to emfree/sarama that referenced this issue Dec 20, 2017
As previously noted (IBM#903), Sarama currently doesn't assign inner
offsets in compressed messages. This means that the broker has to assign
relative offsets and then do recompression.

Which is fine! But if we assign offsets in the producer, then the broker can
skip recompression, improving broker CPU efficiency.

One way to see the effect is by adding some debug logging to Kafka with
the following patch:

```
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 224a79277..4ecb98687 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -20,6 +20,7 @@ import java.nio.ByteBuffer

 import kafka.common.LongRef
 import kafka.message.{CompressionCodec, InvalidMessageException, NoCompressionCodec}
+import org.apache.log4j.Logger
 import org.apache.kafka.common.errors.InvalidTimestampException
 import org.apache.kafka.common.record._

@@ -27,6 +28,7 @@ import scala.collection.mutable
 import scala.collection.JavaConverters._

 private[kafka] object LogValidator {
+  private val log = Logger.getLogger(getClass())

   /**
    * Update the offsets for this message set and do further validation on messages including:
@@ -172,8 +174,13 @@ private[kafka] object LogValidator {
         // Validate the timestamp
         validateTimestamp(record, now, messageTimestampType, messageTimestampDiffMaxMs)
         // Check if we need to overwrite offset, no in place assignment situation 3
-        if (logEntry.offset != expectedInnerOffset.getAndIncrement())
+        val incrementedInnerOffset = expectedInnerOffset.getAndIncrement()
+        if (logEntry.offset != incrementedInnerOffset) {
+          log.info("overwriting inner offset; got " + logEntry.offset + " expected " + incrementedInnerOffset)
           inPlaceAssignment = false
+        } else {
+          log.info("no need to overwrite offset; got " + logEntry.offset + " expected " + incrementedInnerOffset)
+        }
         if (record.timestamp > maxTimestamp)
           maxTimestamp = record.timestamp
       }
```

This produces the following broker logs:

Before:
```
[2017-12-13 13:30:04,667] INFO no need to overwrite offset; got 0 expected 0 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 1 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 2 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 3 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 4 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 5 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 6 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 7 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 8 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 9 (kafka.log.LogValidator$)
```

After:
```
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 0 expected 0 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 1 expected 1 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 2 expected 2 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 3 expected 3 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 4 expected 4 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 5 expected 5 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 6 expected 6 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 7 expected 7 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 8 expected 8 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 9 expected 9 (kafka.log.LogValidator$)
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants