Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assign relative offsets in compressed message sets #1002

Merged
merged 2 commits into from
Dec 20, 2017

Conversation

emfree
Copy link
Contributor

@emfree emfree commented Dec 13, 2017

As previously noted (#903), Sarama currently doesn't assign inner
offsets in compressed messages. This means that the broker has to assign
relative offsets and then do recompression.

Which is fine! But if we assign offsets in the producer, then the broker can
skip recompression, improving broker CPU efficiency.

One way to see the effect is by adding some debug logging to Kafka with
the following patch:

diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 224a79277..4ecb98687 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -20,6 +20,7 @@ import java.nio.ByteBuffer

 import kafka.common.LongRef
 import kafka.message.{CompressionCodec, InvalidMessageException, NoCompressionCodec}
+import org.apache.log4j.Logger
 import org.apache.kafka.common.errors.InvalidTimestampException
 import org.apache.kafka.common.record._

@@ -27,6 +28,7 @@ import scala.collection.mutable
 import scala.collection.JavaConverters._

 private[kafka] object LogValidator {
+  private val log = Logger.getLogger(getClass())

   /**
    * Update the offsets for this message set and do further validation on messages including:
@@ -172,8 +174,13 @@ private[kafka] object LogValidator {
         // Validate the timestamp
         validateTimestamp(record, now, messageTimestampType, messageTimestampDiffMaxMs)
         // Check if we need to overwrite offset, no in place assignment situation 3
-        if (logEntry.offset != expectedInnerOffset.getAndIncrement())
+        val incrementedInnerOffset = expectedInnerOffset.getAndIncrement()
+        if (logEntry.offset != incrementedInnerOffset) {
+          log.info("overwriting inner offset; got " + logEntry.offset + " expected " + incrementedInnerOffset)
           inPlaceAssignment = false
+        } else {
+          log.info("no need to overwrite offset; got " + logEntry.offset + " expected " + incrementedInnerOffset)
+        }
         if (record.timestamp > maxTimestamp)
           maxTimestamp = record.timestamp
       }

This produces the following broker logs:

Before:

[2017-12-13 13:30:04,667] INFO no need to overwrite offset; got 0 expected 0 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 1 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 2 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 3 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 4 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 5 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 6 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 7 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 8 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 9 (kafka.log.LogValidator$)

After:

[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 0 expected 0 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 1 expected 1 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 2 expected 2 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 3 expected 3 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 4 expected 4 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 5 expected 5 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 6 expected 6 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 7 expected 7 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 8 expected 8 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 9 expected 9 (kafka.log.LogValidator$)

@emfree
Copy link
Contributor Author

emfree commented Dec 14, 2017

I'm sorry to say that I can't quite tell what made Travis CI fail there. It looks like it could be related to Travis's recent image update (others seem to be reporting similar problems such as travis-ci/travis-ci#8898), but if you can point me at any issues with the patch itself I'll definitely address them. Thank you!

@eapache
Copy link
Contributor

eapache commented Dec 14, 2017

Thanks, this looks great.

Just one question, which I'd raised on #903 as well: do we need to do something to maintain backwards-compatibility? Is this supported by older brokers or do we need to guard this behaviour with a version check?

Re. Travis, it has been really flaky recently; the issue you linked seems related. I'll see if I can get it working again, but it doesn't seem related to this PR specifically.

@emfree
Copy link
Contributor Author

emfree commented Dec 14, 2017

Thank you!

Just one question, which I'd raised on #903 as well: do we need to do something to maintain backwards-compatibility? Is this supported by older brokers or do we need to guard this behaviour with a version check?

Ah, right, thanks! This change is backwards compatible: Kafka brokers prior to version 0.10 always need to assign absolute offsets to the inner messages, and so they ignore any offsets supplied by the producer. However, it's simple to add a version check so that we don't need to worry about that at all; I'll update the PR.

@emfree
Copy link
Contributor Author

emfree commented Dec 20, 2017

(note: PR updated to add Kafka version check!)

@eapache
Copy link
Contributor

eapache commented Dec 20, 2017

This looks great to me thanks! Just rebase on master so CI runs properly and I'll merge it.

As previously noted (IBM#903), Sarama currently doesn't assign inner
offsets in compressed messages. This means that the broker has to assign
relative offsets and then do recompression.

Which is fine! But if we assign offsets in the producer, then the broker can
skip recompression, improving broker CPU efficiency.

One way to see the effect is by adding some debug logging to Kafka with
the following patch:

```
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 224a79277..4ecb98687 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -20,6 +20,7 @@ import java.nio.ByteBuffer

 import kafka.common.LongRef
 import kafka.message.{CompressionCodec, InvalidMessageException, NoCompressionCodec}
+import org.apache.log4j.Logger
 import org.apache.kafka.common.errors.InvalidTimestampException
 import org.apache.kafka.common.record._

@@ -27,6 +28,7 @@ import scala.collection.mutable
 import scala.collection.JavaConverters._

 private[kafka] object LogValidator {
+  private val log = Logger.getLogger(getClass())

   /**
    * Update the offsets for this message set and do further validation on messages including:
@@ -172,8 +174,13 @@ private[kafka] object LogValidator {
         // Validate the timestamp
         validateTimestamp(record, now, messageTimestampType, messageTimestampDiffMaxMs)
         // Check if we need to overwrite offset, no in place assignment situation 3
-        if (logEntry.offset != expectedInnerOffset.getAndIncrement())
+        val incrementedInnerOffset = expectedInnerOffset.getAndIncrement()
+        if (logEntry.offset != incrementedInnerOffset) {
+          log.info("overwriting inner offset; got " + logEntry.offset + " expected " + incrementedInnerOffset)
           inPlaceAssignment = false
+        } else {
+          log.info("no need to overwrite offset; got " + logEntry.offset + " expected " + incrementedInnerOffset)
+        }
         if (record.timestamp > maxTimestamp)
           maxTimestamp = record.timestamp
       }
```

This produces the following broker logs:

Before:
```
[2017-12-13 13:30:04,667] INFO no need to overwrite offset; got 0 expected 0 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 1 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 2 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 3 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 4 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 5 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 6 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 7 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 8 (kafka.log.LogValidator$)
[2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 9 (kafka.log.LogValidator$)
```

After:
```
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 0 expected 0 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 1 expected 1 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 2 expected 2 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 3 expected 3 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 4 expected 4 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 5 expected 5 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 6 expected 6 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 7 expected 7 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 8 expected 8 (kafka.log.LogValidator$)
[2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 9 expected 9 (kafka.log.LogValidator$)
```
@emfree
Copy link
Contributor Author

emfree commented Dec 20, 2017

Excellent, thanks. Rebased and CI looks happy.

@eapache eapache merged commit f144d11 into IBM:master Dec 20, 2017
@bobrik
Copy link
Contributor

bobrik commented Dec 29, 2017

It looks like this doesn't address recompression in 1.0.0. I'm not very familiar with the code, but maybe it's because this change doesn't affect 0.11.0.0+, which goes here:

if req.Version >= 3 {
	req.AddBatch(topic, partition, set.recordsToSend.recordBatch)
	continue
}

We're seeing high broker CPU usage with Snappy compression, broker stack looks like this:

"kafka-request-handler-3" #87 daemon prio=5 os_prio=0 tid=0x00007f80d2e97800 nid=0x1194 runnable [0x00007f7ee1adc000]
   java.lang.Thread.State: RUNNABLE
	at org.xerial.snappy.SnappyNative.rawCompress(Native Method)
	at org.xerial.snappy.Snappy.rawCompress(Snappy.java:446)
	at org.xerial.snappy.Snappy.compress(Snappy.java:119)
	at org.xerial.snappy.SnappyOutputStream.compressInput(SnappyOutputStream.java:376)
	at org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:130)
	at java.io.DataOutputStream.write(DataOutputStream.java:107)
	- locked <0x00000007a74cc8f0> (a java.io.DataOutputStream)
	at org.apache.kafka.common.utils.Utils.writeTo(Utils.java:861)
	at org.apache.kafka.common.record.DefaultRecord.writeTo(DefaultRecord.java:203)
	at org.apache.kafka.common.record.MemoryRecordsBuilder.appendDefaultRecord(MemoryRecordsBuilder.java:622)
	at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:409)
	at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:442)
	at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:595)
	at kafka.log.LogValidator$.$anonfun$buildRecordsAndAssignOffsets$1(LogValidator.scala:336)
	at kafka.log.LogValidator$.$anonfun$buildRecordsAndAssignOffsets$1$adapted(LogValidator.scala:335)
	at kafka.log.LogValidator$$$Lambda$675/1035377790.apply(Unknown Source)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:52)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at kafka.log.LogValidator$.buildRecordsAndAssignOffsets(LogValidator.scala:335)
	at kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(LogValidator.scala:288)
	at kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:71)
	at kafka.log.Log.liftedTree1$1(Log.scala:654)
	at kafka.log.Log.$anonfun$append$2(Log.scala:642)
	- locked <0x0000000640068e88> (a java.lang.Object)
	at kafka.log.Log$$Lambda$627/239353060.apply(Unknown Source)
	at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
	at kafka.log.Log.append(Log.scala:624)
	at kafka.log.Log.appendAsLeader(Log.scala:597)
	at kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:499)
	at kafka.cluster.Partition$$Lambda$625/1001513143.apply(Unknown Source)
	at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
	at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:223)
	at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:487)
	at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:724)
	at kafka.server.ReplicaManager$$Lambda$624/2052953875.apply(Unknown Source)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$Lambda$12/187472540.apply(Unknown Source)
	at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:138)
	at scala.collection.mutable.HashMap$$Lambda$25/1864869682.apply(Unknown Source)
	at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:236)
	at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:229)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap.foreach(HashMap.scala:138)
	at scala.collection.TraversableLike.map(TraversableLike.scala:234)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:227)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:708)
	at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:459)
	at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:466)
	at kafka.server.KafkaApis.handle(KafkaApis.scala:99)
	at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:65)
	at java.lang.Thread.run(Thread.java:748)

@bobrik
Copy link
Contributor

bobrik commented Dec 29, 2017

I pin-pointed the issue to this lines:

My patch for Kafka to get logs:

diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 15750e9cd..5197d0885 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer
 import kafka.common.LongRef
 import kafka.message.{CompressionCodec, NoCompressionCodec}
 import kafka.utils.Logging
+import org.apache.log4j.Logger
 import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedForMessageFormatException}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Time
@@ -236,6 +237,7 @@ private[kafka] object LogValidator extends Logging {
 
       // No in place assignment situation 1 and 2
       var inPlaceAssignment = sourceCodec == targetCodec && toMagic > RecordBatch.MAGIC_VALUE_V0
+      logger.info("inPlaceAssignment = " + inPlaceAssignment + ", condition: sourceCodec (" + sourceCodec + ") == targetCodec (" + targetCodec + ") && toMagic (" + toMagic + ") > RecordBatch.MAGIC_VALUE_V0 (" + RecordBatch.MAGIC_VALUE_V0 + ")")
 
       var maxTimestamp = RecordBatch.NO_TIMESTAMP
       val expectedInnerOffset = new LongRef(0)
@@ -250,6 +252,7 @@ private[kafka] object LogValidator extends Logging {
         // Do not compress control records unless they are written compressed
         if (sourceCodec == NoCompressionCodec && batch.isControlBatch)
           inPlaceAssignment = true
+          logger.info("inPlaceAssignment = " + inPlaceAssignment + ", condition: sourceCodec (" + sourceCodec + ") == NoCompressionCodec (" + NoCompressionCodec + ") && batch.isControlBatch (" + batch.isControlBatch + ")")
 
         for (record <- batch.asScala) {
           validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
@@ -261,21 +264,26 @@ private[kafka] object LogValidator extends Logging {
           if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
             // Check if we need to overwrite offset
             // No in place assignment situation 3
-            if (record.offset != expectedInnerOffset.getAndIncrement())
+            val off = expectedInnerOffset.getAndIncrement()
+            if (record.offset != off)
               inPlaceAssignment = false
+              logger.info("inPlaceAssignment = " + inPlaceAssignment + ", condition: record.offset (" + record.offset + ") != expectedInnerOffset.getAndIncrement() (" + off + ")")
             if (record.timestamp > maxTimestamp)
               maxTimestamp = record.timestamp
           }
 
           // No in place assignment situation 4
-          if (!record.hasMagic(toMagic))
+          if (!record.hasMagic(toMagic)) {
+            logger.info("inPlaceAssignment = " + inPlaceAssignment + ", condition: !record.hasMagic(toMagic) (" + !record.hasMagic(toMagic) + ")")
             inPlaceAssignment = false
+          }
 
           validatedRecords += record
         }
       }
 
       if (!inPlaceAssignment) {
+        logger.info("inPlaceAssignment = " + inPlaceAssignment + "; recompressing")
         val (producerId, producerEpoch, sequence, isTransactional) = {
           // note that we only reassign offsets for requests coming straight from a producer. For records with magic V2,
           // there should be exactly one RecordBatch per request, so the following is all we need to do. For Records

Log output:

Dec 29 23:18:59 36s294 kafka[33461]: INFO inPlaceAssignment = true, condition: sourceCodec (SnappyCompressionCodec) == targetCodec (SnappyCompressionCodec) && toMagic (2) > RecordBatch.MAGIC_VALUE_V0 (0) (kafka.log.LogValidator$)
Dec 29 23:18:59 36s294 kafka[33461]: INFO inPlaceAssignment = true, condition: sourceCodec (SnappyCompressionCodec) == NoCompressionCodec (NoCompressionCodec) && batch.isControlBatch (false) (kafka.log.LogValidator$)
Dec 29 23:18:59 36s294 kafka[33461]: INFO inPlaceAssignment = true, condition: record.offset (0) != expectedInnerOffset.getAndIncrement() (0) (kafka.log.LogValidator$)
Dec 29 23:18:59 36s294 kafka[33461]: INFO inPlaceAssignment = false, condition: record.offset (0) != expectedInnerOffset.getAndIncrement() (1) (kafka.log.LogValidator$)
Dec 29 23:18:59 36s294 kafka[33461]: INFO inPlaceAssignment = false, condition: record.offset (0) != expectedInnerOffset.getAndIncrement() (2) (kafka.log.LogValidator$)
Dec 29 23:18:59 36s294 kafka[33461]: INFO inPlaceAssignment = false, condition: record.offset (0) != expectedInnerOffset.getAndIncrement() (3) (kafka.log.LogValidator$)
Dec 29 23:18:59 36s294 kafka[33461]: INFO inPlaceAssignment = false, condition: record.offset (0) != expectedInnerOffset.getAndIncrement() (4) (kafka.log.LogValidator$)
Dec 29 23:18:59 36s294 kafka[33461]: INFO inPlaceAssignment = false, condition: record.offset (0) != expectedInnerOffset.getAndIncrement() (5) (kafka.log.LogValidator$)
Dec 29 23:18:59 36s294 kafka[33461]: INFO inPlaceAssignment = false, condition: record.offset (0) != expectedInnerOffset.getAndIncrement() (6) (kafka.log.LogValidator$)
...

@bobrik
Copy link
Contributor

bobrik commented Dec 29, 2017

I think I got it:

diff --git a/vendor/github.com/Shopify/sarama/produce_set.go b/vendor/github.com/Shopify/sarama/produce_set.go
index 61eb3f0..b9eac95 100644
--- a/vendor/github.com/Shopify/sarama/produce_set.go
+++ b/vendor/github.com/Shopify/sarama/produce_set.go
@@ -122,6 +122,10 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
        for topic, partitionSet := range ps.msgs {
                for partition, set := range partitionSet {
                        if req.Version >= 3 {
+                               for i, record := range set.recordsToSend.recordBatch.Records {
+                                       record.OffsetDelta = int64(i)
+                               }
+
                                req.AddBatch(topic, partition, set.recordsToSend.recordBatch)
                                continue
                        }

@emfree, @eapache, does this seem reasonable to you? Should I make a PR?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants