diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java index efe04ef99d85c..4edc35b08b99b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java @@ -30,6 +30,7 @@ import org.apache.kafka.streams.processor.TaskId; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; @@ -47,12 +48,17 @@ public class TaskExecutor { private final Logger log; + private final boolean hasNamedTopologies; private final ProcessingMode processingMode; private final Tasks tasks; - public TaskExecutor(final Tasks tasks, final ProcessingMode processingMode, final LogContext logContext) { + public TaskExecutor(final Tasks tasks, + final ProcessingMode processingMode, + final boolean hasNamedTopologies, + final LogContext logContext) { this.tasks = tasks; this.processingMode = processingMode; + this.hasNamedTopologies = hasNamedTopologies; this.log = logContext.logger(getClass()); } @@ -62,9 +68,16 @@ public TaskExecutor(final Tasks tasks, final ProcessingMode processingMode, fina */ int process(final int maxNumRecords, final Time time) { int totalProcessed = 0; - - for (final Task task : tasks.activeTasks()) { - totalProcessed += processTask(task, maxNumRecords, time); + Task lastProcessed = null; + try { + for (final Task task : tasks.activeTasks()) { + lastProcessed = task; + totalProcessed += processTask(task, maxNumRecords, time); + } + } catch (final Exception e) { + tasks.removeTaskFromCuccessfullyProcessedBeforeClosing(lastProcessed); + commitSuccessfullyProcessedTasks(); + throw e; } return totalProcessed; @@ -80,6 +93,10 @@ private long processTask(final Task task, final int maxNumRecords, final Time ti task.clearTaskTimeout(); processed++; } + // TODO: enable regardless of whether using named topologies + if (hasNamedTopologies && processingMode != EXACTLY_ONCE_V2) { + tasks.addToSuccessfullyProcessed(task); + } } catch (final TimeoutException timeoutException) { task.maybeInitTaskTimeoutOrThrow(now, timeoutException); log.debug( @@ -139,8 +156,6 @@ int commitTasksAndMaybeUpdateCommittableOffsets(final Collection tasksToCo return committed; } - - /** * Caution: do not invoke this directly if it's possible a rebalance is occurring, as the commit will fail. If * this is a possibility, prefer the {@link #commitTasksAndMaybeUpdateCommittableOffsets} instead. @@ -234,6 +249,16 @@ private void updateTaskCommitMetadata(final Map()); + } + tasks.clearSuccessfullyProcessed(); + } + /** * @throws TaskMigratedException if the task producer got fenced (EOS only) */ diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index fb1df2246214d..0ff6dc21ab2da 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -116,7 +116,7 @@ public class TaskManager { this.log = logContext.logger(getClass()); this.tasks = new Tasks(logContext, topologyMetadata, streamsMetrics, activeTaskCreator, standbyTaskCreator); - this.taskExecutor = new TaskExecutor(tasks, processingMode, logContext); + this.taskExecutor = new TaskExecutor(tasks, processingMode, topologyMetadata.hasNamedTopologies(), logContext); } void setMainConsumer(final Consumer mainConsumer) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java index f283945676627..2740791f8f628 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java @@ -59,6 +59,7 @@ class Tasks { // TODO: change type to `StandbyTask` private final Map readOnlyStandbyTasksPerId = Collections.unmodifiableMap(standbyTasksPerId); private final Set readOnlyStandbyTaskIds = Collections.unmodifiableSet(standbyTasksPerId.keySet()); + private final Collection successfullyProcessed = new HashSet<>(); private final ActiveTaskCreator activeTaskCreator; private final StandbyTaskCreator standbyTaskCreator; @@ -319,6 +320,22 @@ Consumer mainConsumer() { return mainConsumer; } + Collection successfullyProcessed() { + return successfullyProcessed; + } + + void addToSuccessfullyProcessed(final Task task) { + successfullyProcessed.add(task); + } + + void removeTaskFromCuccessfullyProcessedBeforeClosing(final Task task) { + successfullyProcessed.remove(task); + } + + void clearSuccessfullyProcessed() { + successfullyProcessed.clear(); + } + // for testing only void addTask(final Task task) { if (task.isActive()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java index 63e0f273b99d7..2d04070b402be 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java @@ -30,6 +30,8 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper; +import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.StreamsTestUtils; import org.apache.kafka.test.TestUtils; @@ -45,11 +47,14 @@ import java.util.Arrays; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkObjectProperties; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; @Category(IntegrationTest.class) public class EmitOnChangeIntegrationTest { @@ -70,7 +75,9 @@ public static void closeCluster() { public TestName testName = new TestName(); private static String inputTopic; + private static String inputTopic2; private static String outputTopic; + private static String outputTopic2; private static String appId = ""; @Before @@ -78,8 +85,10 @@ public void setup() { final String testId = safeUniqueTestName(getClass(), testName); appId = "appId_" + testId; inputTopic = "input" + testId; + inputTopic2 = "input2" + testId; outputTopic = "output" + testId; - IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic, outputTopic); + outputTopic2 = "output2" + testId; + IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic, outputTopic, inputTopic2, outputTopic2); } @Test @@ -110,6 +119,7 @@ public void shouldEmitSameRecordAfterFailover() throws Exception { } }) .to(outputTopic); + builder.stream(inputTopic2).to(outputTopic2); try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) { kafkaStreams.setUncaughtExceptionHandler(exception -> StreamThreadExceptionResponse.REPLACE_THREAD); @@ -128,6 +138,19 @@ public void shouldEmitSameRecordAfterFailover() throws Exception { new Properties()), 0L); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + inputTopic2, + Arrays.asList( + new KeyValue<>(1, "A"), + new KeyValue<>(1, "B") + ), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + IntegerSerializer.class, + StringSerializer.class, + new Properties()), + 0L); + IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived( TestUtils.consumerConfig( CLUSTER.bootstrapServers(), @@ -140,6 +163,91 @@ public void shouldEmitSameRecordAfterFailover() throws Exception { new KeyValue<>(1, "B") ) ); + IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + StringDeserializer.class + ), + outputTopic2, + Arrays.asList( + new KeyValue<>(1, "A"), + new KeyValue<>(1, "B") + ) + ); + } + } + + @Test + public void shouldEmitRecordsAfterFailures() throws Exception { + final Properties properties = mkObjectProperties( + mkMap( + mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), + mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId), + mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()), + mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1), + mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0), + mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000L), + mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class), + mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class), + mkEntry(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000) + ) + ); + + try (final KafkaStreamsNamedTopologyWrapper kafkaStreams = new KafkaStreamsNamedTopologyWrapper(properties)) { + kafkaStreams.setUncaughtExceptionHandler(exception -> StreamThreadExceptionResponse.REPLACE_THREAD); + + final NamedTopologyBuilder builder = kafkaStreams.newNamedTopologyBuilder("topology_A"); + final AtomicInteger noOutputExpected = new AtomicInteger(0); + final AtomicInteger twoOutputExpected = new AtomicInteger(0); + builder.stream(inputTopic2).peek((k, v) -> twoOutputExpected.incrementAndGet()).to(outputTopic2); + builder.stream(inputTopic) + .peek((k, v) -> { + throw new RuntimeException("Kaboom"); + }) + .peek((k, v) -> noOutputExpected.incrementAndGet()) + .to(outputTopic); + + kafkaStreams.addNamedTopology(builder.build()); + + StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + inputTopic, + Arrays.asList( + new KeyValue<>(1, "A") + ), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + IntegerSerializer.class, + StringSerializer.class, + new Properties()), + 0L); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + inputTopic2, + Arrays.asList( + new KeyValue<>(1, "A"), + new KeyValue<>(1, "B") + ), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + IntegerSerializer.class, + StringSerializer.class, + new Properties()), + 0L); + IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + StringDeserializer.class + ), + outputTopic2, + Arrays.asList( + new KeyValue<>(1, "A"), + new KeyValue<>(1, "B") + ) + ); + assertThat(noOutputExpected.get(), equalTo(0)); + assertThat(twoOutputExpected.get(), equalTo(2)); } } }