diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java index 92231da7e6ef5..2012cfdf09259 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java @@ -16,9 +16,14 @@ */ package org.apache.kafka.streams.integration; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; @@ -32,6 +37,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.GroupProtocol; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.KeyValue; @@ -68,10 +74,10 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,6 +88,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -109,6 +116,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; @Timeout(600) @Tag("integration") @@ -121,13 +129,20 @@ public class RestoreIntegrationTest { public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + private static Admin admin; + @BeforeAll public static void startCluster() throws IOException { CLUSTER.start(); + + final Properties adminConfig = new Properties(); + adminConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + admin = Admin.create(adminConfig); } @AfterAll public static void closeCluster() { + Utils.closeQuietly(admin, "admin"); CLUSTER.stop(); } @@ -160,7 +175,7 @@ private Properties props(final Properties extraProperties) { streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); - streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsConfiguration.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); streamsConfiguration.putAll(extraProperties); streamsConfigurations.add(streamsConfiguration); @@ -178,11 +193,12 @@ public void shutdown() throws Exception { streamsConfigurations.clear(); } - @Test - public void shouldRestoreNullRecord() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void shouldRestoreNullRecord(final boolean useNewProtocol) throws Exception { final StreamsBuilder builder = new StreamsBuilder(); - final String applicationId = "restoration-test-app"; + final String applicationId = appId; final String stateStoreName = "stateStore"; final String inputTopic = "input"; final String outputTopic = "output"; @@ -196,6 +212,10 @@ public void shouldRestoreNullRecord() throws Exception { Serdes.BytesSerde.class.getName(), props); + if (useNewProtocol) { + streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); + } + CLUSTER.createTopics(inputTopic); CLUSTER.createTopics(outputTopic); @@ -244,18 +264,28 @@ public void shouldRestoreNullRecord() throws Exception { } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void shouldRestoreStateFromSourceTopicForReadOnlyStore(final boolean stateUpdaterEnabled) throws Exception { + @CsvSource({ + "true,true", + "true,false", + "false,true", + "false,false" + }) + public void shouldRestoreStateFromSourceTopicForReadOnlyStore(final boolean stateUpdaterEnabled, final boolean useNewProtocol) throws Exception { final AtomicInteger numReceived = new AtomicInteger(0); final Topology topology = new Topology(); final Properties props = props(stateUpdaterEnabled); + if (useNewProtocol) { + props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); + } // restoring from 1000 to 4000 (committed), and then process from 4000 to 5000 on each of the two partitions final int offsetLimitDelta = 1000; final int offsetCheckpointed = 1000; createStateForRestoration(inputStream, 0); - setCommittedOffset(inputStream, offsetLimitDelta); + if (!useNewProtocol) { + setCommittedOffset(inputStream, offsetLimitDelta, useNewProtocol); + } final StateDirectory stateDirectory = new StateDirectory(new StreamsConfig(props), new MockTime(), true, false); // note here the checkpointed offset is the last processed record's offset, so without control message we should write this offset - 1 @@ -264,7 +294,6 @@ public void shouldRestoreStateFromSourceTopicForReadOnlyStore(final boolean stat new OffsetCheckpoint(new File(stateDirectory.getOrCreateDirectoryForTask(new TaskId(0, 1)), ".checkpoint")) .write(Collections.singletonMap(new TopicPartition(inputStream, 1), (long) offsetCheckpointed - 1)); - final CountDownLatch startupLatch = new CountDownLatch(1); final CountDownLatch shutdownLatch = new CountDownLatch(1); topology.addReadOnlyStateStore( @@ -282,17 +311,23 @@ public void shouldRestoreStateFromSourceTopicForReadOnlyStore(final boolean stat ); kafkaStreams = new KafkaStreams(topology, props); - kafkaStreams.setStateListener((newState, oldState) -> { - if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) { - startupLatch.countDown(); - } - }); final AtomicLong restored = new AtomicLong(0); kafkaStreams.setGlobalStateRestoreListener(new TrackingStateRestoreListener(restored)); - kafkaStreams.start(); + startApplicationAndWaitUntilRunning(kafkaStreams); + + if (useNewProtocol) { + // For new protocol, we need to stop the streams instance before altering offsets + kafkaStreams.close(Duration.ofSeconds(60)); + setCommittedOffset(inputStream, offsetLimitDelta, useNewProtocol); + + // Restart the streams instance with a new startup latch + + kafkaStreams = new KafkaStreams(topology, props); + kafkaStreams.setGlobalStateRestoreListener(new TrackingStateRestoreListener(restored)); + startApplicationAndWaitUntilRunning(kafkaStreams); + } - assertTrue(startupLatch.await(30, TimeUnit.SECONDS)); assertThat(restored.get(), equalTo((long) numberOfKeys - offsetLimitDelta * 2 - offsetCheckpointed * 2)); assertTrue(shutdownLatch.await(30, TimeUnit.SECONDS)); @@ -300,19 +335,29 @@ public void shouldRestoreStateFromSourceTopicForReadOnlyStore(final boolean stat } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void shouldRestoreStateFromSourceTopicForGlobalTable(final boolean stateUpdaterEnabled) throws Exception { + @CsvSource({ + "true,true", + "true,false", + "false,true", + "false,false" + }) + public void shouldRestoreStateFromSourceTopicForGlobalTable(final boolean stateUpdaterEnabled, final boolean useNewProtocol) throws Exception { final AtomicInteger numReceived = new AtomicInteger(0); final StreamsBuilder builder = new StreamsBuilder(); final Properties props = props(stateUpdaterEnabled); props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); + if (useNewProtocol) { + props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); + } // restoring from 1000 to 4000 (committed), and then process from 4000 to 5000 on each of the two partitions final int offsetLimitDelta = 1000; final int offsetCheckpointed = 1000; createStateForRestoration(inputStream, 0); - setCommittedOffset(inputStream, offsetLimitDelta); + if (!useNewProtocol) { + setCommittedOffset(inputStream, offsetLimitDelta, useNewProtocol); + } final StateDirectory stateDirectory = new StateDirectory(new StreamsConfig(props), new MockTime(), true, false); // note here the checkpointed offset is the last processed record's offset, so without control message we should write this offset - 1 @@ -344,6 +389,20 @@ public void shouldRestoreStateFromSourceTopicForGlobalTable(final boolean stateU kafkaStreams.start(); assertTrue(startupLatch.await(30, TimeUnit.SECONDS)); + + if (useNewProtocol) { + // For new protocol, we need to stop the streams instance before altering offsets + kafkaStreams.close(); + setCommittedOffset(inputStream, offsetLimitDelta, useNewProtocol); + + // Restart the streams instance with a new startup latch + kafkaStreams = new KafkaStreams(builder.build(props), props); + + kafkaStreams.setGlobalStateRestoreListener(new TrackingStateRestoreListener(restored)); + startApplicationAndWaitUntilRunning(kafkaStreams); + + } + assertThat(restored.get(), equalTo((long) numberOfKeys - offsetLimitDelta * 2 - offsetCheckpointed * 2)); assertTrue(shutdownLatch.await(30, TimeUnit.SECONDS)); @@ -351,8 +410,13 @@ public void shouldRestoreStateFromSourceTopicForGlobalTable(final boolean stateU } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void shouldRestoreStateFromChangelogTopic(final boolean stateUpdaterEnabled) throws Exception { + @CsvSource({ + "true,true", + "true,false", + "false,true", + "false,false" + }) + public void shouldRestoreStateFromChangelogTopic(final boolean stateUpdaterEnabled, final boolean useNewProtocol) throws Exception { final String changelog = appId + "-store-changelog"; CLUSTER.createTopic(changelog, 2, 1); @@ -361,6 +425,10 @@ public void shouldRestoreStateFromChangelogTopic(final boolean stateUpdaterEnabl final Properties props = props(stateUpdaterEnabled); + if (useNewProtocol) { + props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); + } + // restoring from 1000 to 5000, and then process from 5000 to 10000 on each of the two partitions final int offsetCheckpointed = 1000; createStateForRestoration(changelog, 0); @@ -403,8 +471,13 @@ public void shouldRestoreStateFromChangelogTopic(final boolean stateUpdaterEnabl } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void shouldSuccessfullyStartWhenLoggingDisabled(final boolean stateUpdaterEnabled) throws InterruptedException { + @CsvSource({ + "true,true", + "true,false", + "false,true", + "false,false" + }) + public void shouldSuccessfullyStartWhenLoggingDisabled(final boolean stateUpdaterEnabled, final boolean useNewProtocol) throws InterruptedException { final StreamsBuilder builder = new StreamsBuilder(); final KStream stream = builder.stream(inputStream); @@ -414,23 +487,26 @@ public void shouldSuccessfullyStartWhenLoggingDisabled(final boolean stateUpdate Integer::sum, Materialized.>as("reduce-store").withLoggingDisabled() ); - - final CountDownLatch startupLatch = new CountDownLatch(1); - kafkaStreams = new KafkaStreams(builder.build(), props(stateUpdaterEnabled)); - kafkaStreams.setStateListener((newState, oldState) -> { - if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) { - startupLatch.countDown(); - } - }); - - kafkaStreams.start(); - - assertTrue(startupLatch.await(30, TimeUnit.SECONDS)); + final Properties props = props(stateUpdaterEnabled); + if (useNewProtocol) { + props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); + } + kafkaStreams = new KafkaStreams(builder.build(), props); + try { + startApplicationAndWaitUntilRunning(kafkaStreams); + } catch (final Exception e) { + fail("Failed to start KafkaStreams", e); + } } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void shouldProcessDataFromStoresWithLoggingDisabled(final boolean stateUpdaterEnabled) throws InterruptedException { + @CsvSource({ + "true,true", + "true,false", + "false,true", + "false,false" + }) + public void shouldProcessDataFromStoresWithLoggingDisabled(final boolean stateUpdaterEnabled, final boolean useNewProtocol) throws InterruptedException { IntegrationTestUtils.produceKeyValuesSynchronously(inputStream, asList(KeyValue.pair(1, 1), KeyValue.pair(2, 2), @@ -458,7 +534,12 @@ public void shouldProcessDataFromStoresWithLoggingDisabled(final boolean stateUp final Topology topology = streamsBuilder.build(); - kafkaStreams = new KafkaStreams(topology, props(stateUpdaterEnabled)); + final Properties props = props(stateUpdaterEnabled); + + if (useNewProtocol) { + props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); + } + kafkaStreams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); kafkaStreams.setStateListener((newState, oldState) -> { @@ -474,8 +555,13 @@ public void shouldProcessDataFromStoresWithLoggingDisabled(final boolean stateUp } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(final boolean stateUpdaterEnabled) throws Exception { + @CsvSource({ + "true,true", + "true,false", + "false,true", + "false,false" + }) + public void shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(final boolean stateUpdaterEnabled, final boolean useNewProtocol) throws Exception { final StreamsBuilder builder = new StreamsBuilder(); builder.table( inputStream, @@ -483,15 +569,25 @@ public void shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(f ); createStateForRestoration(inputStream, 0); + if (useNewProtocol) { + CLUSTER.setStandbyReplicas(appId, 1); + } + final Properties props1 = props(stateUpdaterEnabled); props1.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); props1.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId + "-1").getPath()); + if (useNewProtocol) { + props1.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); + } purgeLocalStreamsState(props1); final KafkaStreams streams1 = new KafkaStreams(builder.build(), props1); final Properties props2 = props(stateUpdaterEnabled); props2.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); props2.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId + "-2").getPath()); + if (useNewProtocol) { + props2.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); + } purgeLocalStreamsState(props2); final KafkaStreams streams2 = new KafkaStreams(builder.build(), props2); @@ -513,19 +609,19 @@ public void shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(f waitForStandbyCompletion(streams1, 1, 30 * 1000L); waitForStandbyCompletion(streams2, 1, 30 * 1000L); } catch (final Exception e) { - streams1.close(); - streams2.close(); + streams1.close(Duration.ofSeconds(60)); + streams2.close(Duration.ofSeconds(60)); + throw e; } // Sometimes the store happens to have already been closed sometime during startup, so just keep track // of where it started and make sure it doesn't happen more times from there final int initialStoreCloseCount = CloseCountingInMemoryStore.numStoresClosed(); final long initialNunRestoredCount = restoreListener.totalNumRestored(); - transitionedStates1.clear(); transitionedStates2.clear(); try { - streams2.close(); + streams2.close(Duration.ofSeconds(60)); waitForTransitionTo(transitionedStates2, State.NOT_RUNNING, Duration.ofSeconds(60)); waitForTransitionTo(transitionedStates1, State.REBALANCING, Duration.ofSeconds(60)); waitForTransitionTo(transitionedStates1, State.RUNNING, Duration.ofSeconds(60)); @@ -535,18 +631,20 @@ public void shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(f assertThat(restoreListener.totalNumRestored(), CoreMatchers.equalTo(initialNunRestoredCount)); - // After stopping instance 2 and letting instance 1 take over its tasks, we should have closed just two stores - // total: the active and standby tasks on instance 2 - assertThat(CloseCountingInMemoryStore.numStoresClosed(), equalTo(initialStoreCloseCount + 2)); + // After stopping instance 2 and letting instance 1 take over its tasks, we should have closed the stores on instance 2. + // Under the new group protocol, an extra store close can occur during rebalance; account for that here. + final int expectedAfterStreams2Close = initialStoreCloseCount + (useNewProtocol ? 3 : 2); + assertThat(CloseCountingInMemoryStore.numStoresClosed(), equalTo(expectedAfterStreams2Close)); } finally { - streams1.close(); + streams1.close(Duration.ofSeconds(60)); } waitForTransitionTo(transitionedStates1, State.NOT_RUNNING, Duration.ofSeconds(60)); assertThat(CloseCountingInMemoryStore.numStoresClosed(), CoreMatchers.equalTo(initialStoreCloseCount + 4)); } - @Test - public void shouldInvokeUserDefinedGlobalStateRestoreListener() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void shouldInvokeUserDefinedGlobalStateRestoreListener(final boolean useNewProtocol) throws Exception { final String inputTopic = "inputTopic"; final String outputTopic = "outputTopic"; CLUSTER.createTopic(inputTopic, 5, 1); @@ -575,7 +673,7 @@ public void shouldInvokeUserDefinedGlobalStateRestoreListener() throws Exception sendEvents(inputTopic, sampleData); - kafkaStreams = startKafkaStreams(builder, null, kafkaStreams1Configuration); + kafkaStreams = startKafkaStreams(builder, null, kafkaStreams1Configuration, useNewProtocol); validateReceivedMessages(sampleData, outputTopic); @@ -584,7 +682,7 @@ public void shouldInvokeUserDefinedGlobalStateRestoreListener() throws Exception IntegrationTestUtils.purgeLocalStreamsState(streamsConfigurations); final TestStateRestoreListener kafkaStreams1StateRestoreListener = new TestStateRestoreListener("ks1", RESTORATION_DELAY); - kafkaStreams = startKafkaStreams(builder, kafkaStreams1StateRestoreListener, kafkaStreams1Configuration); + kafkaStreams = startKafkaStreams(builder, kafkaStreams1StateRestoreListener, kafkaStreams1Configuration, useNewProtocol); // Ensure all the restoring tasks are in active state before starting the new instance. // Otherwise, the tasks which assigned to first kafka streams won't encounter "restoring suspend" after being reassigned to the second instance. @@ -600,7 +698,8 @@ public void shouldInvokeUserDefinedGlobalStateRestoreListener() throws Exception try (final KafkaStreams kafkaStreams2 = startKafkaStreams(builder, kafkaStreams2StateRestoreListener, - kafkaStreams2Configuration)) { + kafkaStreams2Configuration, + useNewProtocol)) { waitForCondition(() -> State.RUNNING == kafkaStreams2.state(), 90_000, @@ -639,8 +738,12 @@ private void validateReceivedMessages(final List> exp private KafkaStreams startKafkaStreams(final StreamsBuilder streamsBuilder, final StateRestoreListener stateRestoreListener, - final Map extraConfiguration) { + final Map extraConfiguration, + final boolean useNewProtocol) { final Properties streamsConfiguration = props(mkObjectProperties(extraConfiguration)); + if (useNewProtocol) { + streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); + } final KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), streamsConfiguration); kafkaStreams.setGlobalStateRestoreListener(stateRestoreListener); @@ -814,29 +917,54 @@ private void createStateForRestoration(final String changelogTopic, final int st } } - private void setCommittedOffset(final String topic, final int limitDelta) { - final Properties consumerConfig = new Properties(); - consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, appId); - consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "commit-consumer"); - consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); - consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); - - final Consumer consumer = new KafkaConsumer<>(consumerConfig); - final List partitions = asList( - new TopicPartition(topic, 0), - new TopicPartition(topic, 1)); - - consumer.assign(partitions); - consumer.seekToEnd(partitions); - - for (final TopicPartition partition : partitions) { - final long position = consumer.position(partition); - consumer.seek(partition, position - limitDelta); + private void setCommittedOffset(final String topic, final int limitDelta, final boolean useNewProtocol) { + if (!useNewProtocol) { + final Properties consumerConfig = new Properties(); + consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, appId); + consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "commit-consumer"); + consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); + consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); + + try (final Consumer consumer = new KafkaConsumer<>(consumerConfig)) { + final List partitions = asList( + new TopicPartition(topic, 0), + new TopicPartition(topic, 1)); + + consumer.assign(partitions); + consumer.seekToEnd(partitions); + + for (final TopicPartition partition : partitions) { + final long position = consumer.position(partition); + consumer.seek(partition, position - limitDelta); + } + + consumer.commitSync(); + } + } else { + try { + final List partitions = asList( + new TopicPartition(topic, 0), + new TopicPartition(topic, 1)); + + final Map offsetSpecs = partitions.stream() + .collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest())); + + final Map endOffsets = + admin.listOffsets(offsetSpecs).all().get(); + + final Map offsetsToCommit = new HashMap<>(); + for (final TopicPartition partition : partitions) { + final long endOffset = endOffsets.get(partition).offset(); + final long targetOffset = Math.max(0, endOffset - limitDelta); + offsetsToCommit.put(partition, new OffsetAndMetadata(targetOffset)); + } + + admin.alterStreamsGroupOffsets(appId, offsetsToCommit).all().get(); + } catch (final Exception e) { + fail("Failed to set committed offsets", e); + } } - - consumer.commitSync(); - consumer.close(); } private void waitForTransitionTo(final Set observed, final KafkaStreams.State state, final Duration timeout) throws Exception {