diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java index ea48c6461732a..1177e29d8259c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java @@ -174,7 +174,7 @@ public static String extractThreadId(final String fullThreadName) { public static long producerRecordSizeInBytes(final ProducerRecord record) { return recordSizeInBytes( - record.key().length, + record.key() == null ? 0 : record.key().length, record.value() == null ? 0 : record.value().length, record.topic(), record.headers() @@ -190,10 +190,10 @@ public static long consumerRecordSizeInBytes(final ConsumerRecord 12 bytes - new RecordHeader("h2", "headerVal2".getBytes()) - )); // 2 + 10 --> 12 bytes + new RecordHeader("h2", "headerVal2".getBytes()) // 2 + 10 --> 12 bytes + )); private static final int HEADERS_BYTES = 24; + // 20 bytes private static final int RECORD_METADATA_BYTES = 8 + // timestamp 8 + // offset @@ -86,6 +87,14 @@ public class ClientUtilsTest { HEADERS_BYTES + RECORD_METADATA_BYTES; + // 54 bytes + private static final long NULL_KEY_SIZE_IN_BYTES = + VALUE_BYTES + + TOPIC_BYTES + + HEADERS_BYTES + + RECORD_METADATA_BYTES; + + // 52 bytes private static final long TOMBSTONE_SIZE_IN_BYTES = KEY_BYTES + TOPIC_BYTES + @@ -202,6 +211,37 @@ public void shouldComputeSizeInBytesForProducerRecord() { assertThat(producerRecordSizeInBytes(record), equalTo(SIZE_IN_BYTES)); } + @Test + public void shouldComputeSizeInBytesForConsumerRecordWithNullKey() { + final ConsumerRecord record = new ConsumerRecord<>( + TOPIC, + 1, + 0, + 0L, + TimestampType.CREATE_TIME, + 0, + 5, + null, + VALUE, + HEADERS, + Optional.empty() + ); + assertThat(consumerRecordSizeInBytes(record), equalTo(NULL_KEY_SIZE_IN_BYTES)); + } + + @Test + public void shouldComputeSizeInBytesForProducerRecordWithNullKey() { + final ProducerRecord record = new ProducerRecord<>( + TOPIC, + 1, + 0L, + null, + VALUE, + HEADERS + ); + assertThat(producerRecordSizeInBytes(record), equalTo(NULL_KEY_SIZE_IN_BYTES)); + } + @Test public void shouldComputeSizeInBytesForConsumerRecordWithNullValue() { final ConsumerRecord record = new ConsumerRecord<>( diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java index f39b2b554a630..84b8c5ebd6024 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java @@ -31,8 +31,6 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.time.Duration; import java.time.Instant; @@ -55,8 +53,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; public class TestTopicsTest { - private static final Logger log = LoggerFactory.getLogger(TestTopicsTest.class); - private final static String INPUT_TOPIC = "input"; private final static String OUTPUT_TOPIC = "output1"; private final static String INPUT_TOPIC_MAP = OUTPUT_TOPIC; @@ -171,15 +167,13 @@ public void testKeyValuesToMap() { } @Test - public void testPipeInputWithNullKey() { + public void testKeyValuesToMapWithNull() { final TestInputTopic inputTopic = testDriver.createInputTopic(INPUT_TOPIC, longSerde.serializer(), stringSerde.serializer()); final TestOutputTopic outputTopic = testDriver.createOutputTopic(OUTPUT_TOPIC, longSerde.deserializer(), stringSerde.deserializer()); - final StreamsException exception = assertThrows(StreamsException.class, () -> inputTopic.pipeInput("value")); - assertThat(exception.getCause() instanceof NullPointerException, is(true)); - assertThat(outputTopic.readKeyValuesToMap().isEmpty(), is(true)); - + inputTopic.pipeInput("value"); + assertThrows(IllegalStateException.class, outputTopic::readKeyValuesToMap); } @Test