diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index cecacb41eefca..6868952069d16 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -548,6 +548,7 @@ public void run() { failedStreamThreadSensor.record(); requestLeaveGroupDuringShutdown(); streamsUncaughtExceptionHandler.accept(e, false); + // Note: the above call currently rethrows the exception, so nothing below this line will be executed } finally { completeShutdown(cleanRun); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java index 63b53aebdcf84..f3422537f991d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java @@ -31,6 +31,9 @@ * shared between all StreamThreads. */ public class TaskExecutionMetadata { + // TODO: implement exponential backoff, for now we just wait 5s + private static final long CONSTANT_BACKOFF_MS = 5_000L; + private final boolean hasNamedTopologies; // map of topologies experiencing errors/currently under backoff private final ConcurrentHashMap topologyNameToErrorMetadata = new ConcurrentHashMap<>(); @@ -58,7 +61,7 @@ public void registerTaskError(final Task task, final Throwable t, final long now } } - class NamedTopologyMetadata { + private class NamedTopologyMetadata { private final Logger log; private final Map tasksToErrorTime = new ConcurrentHashMap<>(); @@ -73,11 +76,10 @@ public boolean canProcess() { } public boolean canProcessTask(final Task task, final long now) { - // TODO: implement exponential backoff, for now we just wait 15s final Long errorTime = tasksToErrorTime.get(task.id()); if (errorTime == null) { return true; - } else if (now - errorTime > 15000L) { + } else if (now - errorTime > CONSTANT_BACKOFF_MS) { log.info("End backoff for task {} at t={}", task.id(), now); tasksToErrorTime.remove(task.id()); if (tasksToErrorTime.isEmpty()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java deleted file mode 100644 index 25f0f3055042d..0000000000000 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.integration; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse; -import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; -import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; -import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.test.IntegrationTest; -import org.apache.kafka.test.StreamsTestUtils; -import org.apache.kafka.test.TestUtils; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.apache.kafka.common.utils.Utils.mkEntry; -import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.apache.kafka.common.utils.Utils.mkObjectProperties; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; - -@Category(IntegrationTest.class) -public class EmitOnChangeIntegrationTest { - - private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); - - @BeforeClass - public static void startCluster() throws IOException { - CLUSTER.start(); - } - - @AfterClass - public static void closeCluster() { - CLUSTER.stop(); - } - - @Rule - public TestName testName = new TestName(); - - private static String inputTopic; - private static String inputTopic2; - private static String outputTopic; - private static String outputTopic2; - private static String appId = ""; - - @Before - public void setup() { - final String testId = safeUniqueTestName(getClass(), testName); - appId = "appId_" + testId; - inputTopic = "input" + testId; - inputTopic2 = "input2" + testId; - outputTopic = "output" + testId; - outputTopic2 = "output2" + testId; - IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic, outputTopic, inputTopic2, outputTopic2); - } - - @Test - public void shouldEmitSameRecordAfterFailover() throws Exception { - final Properties properties = mkObjectProperties( - mkMap( - mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), - mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId), - mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()), - mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1), - mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0), - mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000L), - mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class), - mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class), - mkEntry(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000) - ) - ); - - final AtomicBoolean shouldThrow = new AtomicBoolean(true); - final StreamsBuilder builder = new StreamsBuilder(); - builder.table(inputTopic, Materialized.as("test-store")) - .toStream() - .map((key, value) -> { - if (shouldThrow.compareAndSet(true, false)) { - throw new RuntimeException("Kaboom"); - } else { - return new KeyValue<>(key, value); - } - }) - .to(outputTopic); - builder.stream(inputTopic2).to(outputTopic2); - - try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) { - kafkaStreams.setUncaughtExceptionHandler(exception -> StreamThreadExceptionResponse.REPLACE_THREAD); - StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams); - - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( - inputTopic, - Arrays.asList( - new KeyValue<>(1, "A"), - new KeyValue<>(1, "B") - ), - TestUtils.producerConfig( - CLUSTER.bootstrapServers(), - IntegerSerializer.class, - StringSerializer.class, - new Properties()), - 0L); - - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( - inputTopic2, - Arrays.asList( - new KeyValue<>(1, "A"), - new KeyValue<>(1, "B") - ), - TestUtils.producerConfig( - CLUSTER.bootstrapServers(), - IntegerSerializer.class, - StringSerializer.class, - new Properties()), - 0L); - - IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived( - TestUtils.consumerConfig( - CLUSTER.bootstrapServers(), - IntegerDeserializer.class, - StringDeserializer.class - ), - outputTopic, - Arrays.asList( - new KeyValue<>(1, "A"), - new KeyValue<>(1, "B") - ) - ); - IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived( - TestUtils.consumerConfig( - CLUSTER.bootstrapServers(), - IntegerDeserializer.class, - StringDeserializer.class - ), - outputTopic2, - Arrays.asList( - new KeyValue<>(1, "A"), - new KeyValue<>(1, "B") - ) - ); - } - } -} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java deleted file mode 100644 index f889e3757626c..0000000000000 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.integration; - -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse; -import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; -import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; -import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper; -import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder; -import org.apache.kafka.test.IntegrationTest; -import org.apache.kafka.test.StreamsTestUtils; -import org.apache.kafka.test.TestUtils; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicInteger; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; - -import static org.apache.kafka.common.utils.Utils.mkEntry; -import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.apache.kafka.common.utils.Utils.mkObjectProperties; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.MatcherAssert.assertThat; - -@Category(IntegrationTest.class) -public class ErrorHandlingIntegrationTest { - - private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); - - @BeforeClass - public static void startCluster() throws IOException { - CLUSTER.start(); - } - - @AfterClass - public static void closeCluster() { - CLUSTER.stop(); - } - - @Rule - public TestName testName = new TestName(); - - private final String testId = safeUniqueTestName(getClass(), testName); - private final String appId = "appId_" + testId; - private final Properties properties = props(); - - // Task 0 - private final String inputTopic = "input" + testId; - private final String outputTopic = "output" + testId; - // Task 1 - private final String errorInputTopic = "error-input" + testId; - private final String errorOutputTopic = "error-output" + testId; - - @Before - public void setup() { - IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, errorInputTopic, errorOutputTopic, inputTopic, outputTopic); - } - - private Properties props() { - return mkObjectProperties( - mkMap( - mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), - mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId), - mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath()), - mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0), - mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 15000L), - mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class), - mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class)) - ); - } - - @Test - public void shouldBackOffTaskAndEmitDataWithinSameTopology() throws Exception { - final AtomicInteger noOutputExpected = new AtomicInteger(0); - final AtomicInteger outputExpected = new AtomicInteger(0); - - try (final KafkaStreamsNamedTopologyWrapper kafkaStreams = new KafkaStreamsNamedTopologyWrapper(properties)) { - kafkaStreams.setUncaughtExceptionHandler(exception -> StreamThreadExceptionResponse.REPLACE_THREAD); - - final NamedTopologyBuilder builder = kafkaStreams.newNamedTopologyBuilder("topology_A"); - builder.stream(inputTopic).peek((k, v) -> outputExpected.incrementAndGet()).to(outputTopic); - builder.stream(errorInputTopic) - .peek((k, v) -> { - throw new RuntimeException("Kaboom"); - }) - .peek((k, v) -> noOutputExpected.incrementAndGet()) - .to(errorOutputTopic); - - kafkaStreams.addNamedTopology(builder.build()); - - StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( - errorInputTopic, - Arrays.asList( - new KeyValue<>(1, "A") - ), - TestUtils.producerConfig( - CLUSTER.bootstrapServers(), - IntegerSerializer.class, - StringSerializer.class, - new Properties()), - 0L); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( - inputTopic, - Arrays.asList( - new KeyValue<>(1, "A"), - new KeyValue<>(1, "B") - ), - TestUtils.producerConfig( - CLUSTER.bootstrapServers(), - IntegerSerializer.class, - StringSerializer.class, - new Properties()), - 0L); - IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived( - TestUtils.consumerConfig( - CLUSTER.bootstrapServers(), - IntegerDeserializer.class, - StringDeserializer.class - ), - outputTopic, - Arrays.asList( - new KeyValue<>(1, "A"), - new KeyValue<>(1, "B") - ) - ); - assertThat(noOutputExpected.get(), equalTo(0)); - assertThat(outputExpected.get(), equalTo(2)); - } - } -} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java index 5db1585a09922..7e746f121602e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java @@ -18,6 +18,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serdes; @@ -34,6 +36,7 @@ import org.apache.kafka.streams.errors.MissingSourceTopicException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; +import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.Consumed; @@ -53,8 +56,10 @@ import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.internals.StreamsMetadataImpl; import org.apache.kafka.streams.utils.UniqueTopicSerdeScope; +import org.apache.kafka.test.StreamsTestUtils; import org.apache.kafka.test.TestUtils; +import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; @@ -73,6 +78,7 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -775,6 +781,70 @@ public void shouldWaitForMissingInputTopicsToBeCreated() throws Exception { } } + @Test + public void shouldBackOffTaskAndEmitDataWithinSameTopology() throws Exception { + final AtomicInteger noOutputExpected = new AtomicInteger(0); + final AtomicInteger outputExpected = new AtomicInteger(0); + props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 15000L); + props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath()); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + + streams = new KafkaStreamsNamedTopologyWrapper(props); + streams.setUncaughtExceptionHandler(exception -> StreamThreadExceptionResponse.REPLACE_THREAD); + + final NamedTopologyBuilder builder = streams.newNamedTopologyBuilder("topology_A"); + builder.stream(DELAYED_INPUT_STREAM_1).peek((k, v) -> outputExpected.incrementAndGet()).to(OUTPUT_STREAM_1); + builder.stream(DELAYED_INPUT_STREAM_2) + .peek((k, v) -> { + throw new RuntimeException("Kaboom"); + }) + .peek((k, v) -> noOutputExpected.incrementAndGet()) + .to(OUTPUT_STREAM_2); + + streams.addNamedTopology(builder.build()); + + StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(streams); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + DELAYED_INPUT_STREAM_2, + Arrays.asList( + new KeyValue<>(1, "A") + ), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + IntegerSerializer.class, + StringSerializer.class, + new Properties()), + 0L); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + DELAYED_INPUT_STREAM_1, + Arrays.asList( + new KeyValue<>(1, "A"), + new KeyValue<>(1, "B") + ), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + IntegerSerializer.class, + StringSerializer.class, + new Properties()), + 0L); + IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + StringDeserializer.class + ), + OUTPUT_STREAM_1, + Arrays.asList( + new KeyValue<>(1, "A"), + new KeyValue<>(1, "B") + ) + ); + assertThat(noOutputExpected.get(), equalTo(0)); + assertThat(outputExpected.get(), equalTo(2)); + } + /** * Validates that each metadata object has only partitions & state stores for its specific topology name and * asserts that {@code left} and {@code right} differ only by {@link StreamsMetadata#hostInfo()} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java index 74d6ba9fecd34..0f42d3546fdba 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java @@ -18,7 +18,10 @@ package org.apache.kafka.streams.integration; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; @@ -26,10 +29,12 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder; @@ -83,35 +88,28 @@ public static void startCluster() throws IOException { public static void closeCluster() { CLUSTER.stop(); } - public static final Duration DEFAULT_DURATION = Duration.ofSeconds(30); - - @Rule - public TestName testName = new TestName(); - private static String inputTopic; - private static StreamsBuilder builder; - private static Properties properties; - private static List processorValueCollector; - private static String appId = ""; + public static final Duration DEFAULT_DURATION = Duration.ofSeconds(30); private static final AtomicBoolean THROW_ERROR = new AtomicBoolean(true); private static final AtomicBoolean THROW_ILLEGAL_STATE_EXCEPTION = new AtomicBoolean(false); private static final AtomicBoolean THROW_ILLEGAL_ARGUMENT_EXCEPTION = new AtomicBoolean(false); - @Before - public void setup() { - final String testId = safeUniqueTestName(getClass(), testName); - appId = "appId_" + testId; - inputTopic = "input" + testId; - IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic); - - builder = new StreamsBuilder(); + @Rule + public final TestName testName = new TestName(); - processorValueCollector = new ArrayList<>(); + private final String testId = safeUniqueTestName(getClass(), testName); + private final String appId = "appId_" + testId; + private final String inputTopic = "input" + testId; + private final String inputTopic2 = "input2" + testId; + private final String outputTopic = "output" + testId; + private final String outputTopic2 = "output2" + testId; + private final StreamsBuilder builder = new StreamsBuilder(); + private final List processorValueCollector = new ArrayList<>(); - final KStream stream = builder.stream(inputTopic); - stream.process(() -> new ShutdownProcessor(processorValueCollector), Named.as("process")); + private final Properties properties = basicProps(); - properties = mkObjectProperties( + private Properties basicProps() { + return mkObjectProperties( mkMap( mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId), @@ -124,6 +122,13 @@ public void setup() { ); } + @Before + public void setup() { + IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic, inputTopic2, outputTopic, outputTopic2); + final KStream stream = builder.stream(inputTopic); + stream.process(() -> new ShutdownProcessor(processorValueCollector), Named.as("process")); + } + @After public void teardown() throws IOException { purgeLocalStreamsState(properties); @@ -228,7 +233,6 @@ public void shouldShutdownSingleThreadApplication() throws InterruptedException @Test public void shouldShutDownClientIfGlobalStreamThreadWantsToReplaceThread() throws InterruptedException { - builder = new StreamsBuilder(); builder.addGlobalStore( new KeyValueStoreBuilder<>( Stores.persistentKeyValueStore("globalStore"), @@ -236,7 +240,7 @@ public void shouldShutDownClientIfGlobalStreamThreadWantsToReplaceThread() throw Serdes.String(), CLUSTER.time ), - inputTopic, + inputTopic2, Consumed.with(Serdes.String(), Serdes.String()), () -> new ShutdownProcessor(processorValueCollector) ); @@ -248,7 +252,7 @@ public void shouldShutDownClientIfGlobalStreamThreadWantsToReplaceThread() throw StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams); - produceMessages(0L, inputTopic, "A"); + produceMessages(0L, inputTopic2, "A"); waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION); assertThat(processorValueCollector.size(), equalTo(1)); @@ -256,6 +260,86 @@ public void shouldShutDownClientIfGlobalStreamThreadWantsToReplaceThread() throw } + @Test + public void shouldEmitSameRecordAfterFailover() throws Exception { + properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); + properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000L); + properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class); + properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000); + + final AtomicBoolean shouldThrow = new AtomicBoolean(true); + final StreamsBuilder builder = new StreamsBuilder(); + builder.table(inputTopic, Materialized.as("test-store")) + .toStream() + .map((key, value) -> { + if (shouldThrow.compareAndSet(true, false)) { + throw new RuntimeException("Kaboom"); + } else { + return new KeyValue<>(key, value); + } + }) + .to(outputTopic); + builder.stream(inputTopic2).to(outputTopic2); + + try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) { + kafkaStreams.setUncaughtExceptionHandler(exception -> StreamThreadExceptionResponse.REPLACE_THREAD); + StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams); + + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + inputTopic, + Arrays.asList( + new KeyValue<>(1, "A"), + new KeyValue<>(1, "B") + ), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + IntegerSerializer.class, + StringSerializer.class, + new Properties()), + 0L); + + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + inputTopic2, + Arrays.asList( + new KeyValue<>(1, "A"), + new KeyValue<>(1, "B") + ), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + IntegerSerializer.class, + StringSerializer.class, + new Properties()), + 0L); + + IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + StringDeserializer.class + ), + outputTopic, + Arrays.asList( + new KeyValue<>(1, "A"), + new KeyValue<>(1, "B") + ) + ); + IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + StringDeserializer.class + ), + outputTopic2, + Arrays.asList( + new KeyValue<>(1, "A"), + new KeyValue<>(1, "B") + ) + ); + } + } + private void produceMessages(final long timestamp, final String streamOneInput, final String msg) { IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( streamOneInput,