diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java index 6842c7718e10..c3531009cf2b 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java @@ -42,6 +42,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -59,6 +60,7 @@ import java.util.function.Supplier; import java.util.stream.Stream; +import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -158,16 +160,16 @@ public void cleanup() throws InterruptedException { @ParameterizedTest @MethodSource("data") - public void testStoreConfig(final StoreType storeType, final boolean enableLogging, final boolean enableCaching, final boolean forward) throws Exception { + public void testStoreConfig(final StoreType storeType, final boolean enableLogging, final boolean enableCaching, final boolean forward, final TestInfo testInfo) throws Exception { + final String appID = safeUniqueTestName(testInfo); final StreamsBuilder builder = new StreamsBuilder(); final Materialized> stateStoreConfig = getStoreConfig(storeType, enableLogging, enableCaching); builder.table(inputStream, stateStoreConfig); - try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), STREAMS_CONFIG)) { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams); writeInputData(); - final ReadOnlyKeyValueStore stateStore = IntegrationTestUtils.getStore(1000_000L, TABLE_NAME, kafkaStreams, QueryableStoreTypes.keyValueStore()); // wait for the store to populate