Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assign relative offsets in compressed message sets #1002

Merged
merged 2 commits into from
Dec 20, 2017

Commits on Dec 20, 2017

  1. Assign relative offsets in compressed message sets

    As previously noted (IBM#903), Sarama currently doesn't assign inner
    offsets in compressed messages. This means that the broker has to assign
    relative offsets and then do recompression.
    
    Which is fine! But if we assign offsets in the producer, then the broker can
    skip recompression, improving broker CPU efficiency.
    
    One way to see the effect is by adding some debug logging to Kafka with
    the following patch:
    
    ```
    diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
    index 224a79277..4ecb98687 100644
    --- a/core/src/main/scala/kafka/log/LogValidator.scala
    +++ b/core/src/main/scala/kafka/log/LogValidator.scala
    @@ -20,6 +20,7 @@ import java.nio.ByteBuffer
    
     import kafka.common.LongRef
     import kafka.message.{CompressionCodec, InvalidMessageException, NoCompressionCodec}
    +import org.apache.log4j.Logger
     import org.apache.kafka.common.errors.InvalidTimestampException
     import org.apache.kafka.common.record._
    
    @@ -27,6 +28,7 @@ import scala.collection.mutable
     import scala.collection.JavaConverters._
    
     private[kafka] object LogValidator {
    +  private val log = Logger.getLogger(getClass())
    
       /**
        * Update the offsets for this message set and do further validation on messages including:
    @@ -172,8 +174,13 @@ private[kafka] object LogValidator {
             // Validate the timestamp
             validateTimestamp(record, now, messageTimestampType, messageTimestampDiffMaxMs)
             // Check if we need to overwrite offset, no in place assignment situation 3
    -        if (logEntry.offset != expectedInnerOffset.getAndIncrement())
    +        val incrementedInnerOffset = expectedInnerOffset.getAndIncrement()
    +        if (logEntry.offset != incrementedInnerOffset) {
    +          log.info("overwriting inner offset; got " + logEntry.offset + " expected " + incrementedInnerOffset)
               inPlaceAssignment = false
    +        } else {
    +          log.info("no need to overwrite offset; got " + logEntry.offset + " expected " + incrementedInnerOffset)
    +        }
             if (record.timestamp > maxTimestamp)
               maxTimestamp = record.timestamp
           }
    ```
    
    This produces the following broker logs:
    
    Before:
    ```
    [2017-12-13 13:30:04,667] INFO no need to overwrite offset; got 0 expected 0 (kafka.log.LogValidator$)
    [2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 1 (kafka.log.LogValidator$)
    [2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 2 (kafka.log.LogValidator$)
    [2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 3 (kafka.log.LogValidator$)
    [2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 4 (kafka.log.LogValidator$)
    [2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 5 (kafka.log.LogValidator$)
    [2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 6 (kafka.log.LogValidator$)
    [2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 7 (kafka.log.LogValidator$)
    [2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 8 (kafka.log.LogValidator$)
    [2017-12-13 13:30:01,465] INFO overwriting inner offset; got 0 expected 9 (kafka.log.LogValidator$)
    ```
    
    After:
    ```
    [2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 0 expected 0 (kafka.log.LogValidator$)
    [2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 1 expected 1 (kafka.log.LogValidator$)
    [2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 2 expected 2 (kafka.log.LogValidator$)
    [2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 3 expected 3 (kafka.log.LogValidator$)
    [2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 4 expected 4 (kafka.log.LogValidator$)
    [2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 5 expected 5 (kafka.log.LogValidator$)
    [2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 6 expected 6 (kafka.log.LogValidator$)
    [2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 7 expected 7 (kafka.log.LogValidator$)
    [2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 8 expected 8 (kafka.log.LogValidator$)
    [2017-12-13 13:40:57,995] INFO no need to overwrite offset; got 9 expected 9 (kafka.log.LogValidator$)
    ```
    emfree committed Dec 20, 2017
    Configuration menu
    Copy the full SHA
    a12e79b View commit details
    Browse the repository at this point in the history
  2. Configuration menu
    Copy the full SHA
    f0d0b0f View commit details
    Browse the repository at this point in the history