diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java index 1515debe84546..f980d7a13dfd3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java @@ -41,44 +41,37 @@ import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; - -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.junit.rules.Timeout; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.Timeout; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertTrue; +@Timeout(600) @Category(IntegrationTest.class) public class StateDirectoryIntegrationTest { - @Rule - public Timeout globalTimeout = Timeout.seconds(600); - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3); - @BeforeClass + @BeforeAll public static void startCluster() throws IOException { CLUSTER.start(); } - @AfterClass + @AfterAll public static void closeCluster() { CLUSTER.stop(); } - @Rule - public TestName testName = new TestName(); - @Test - public void testCleanUpStateDirIfEmpty() throws InterruptedException { - final String uniqueTestName = safeUniqueTestName(getClass(), testName); + public void testCleanUpStateDirIfEmpty(final TestInfo testInfo) throws InterruptedException { + final String uniqueTestName = safeUniqueTestName(getClass(), testInfo); // Create Topic final String input = uniqueTestName + "-input"; @@ -183,8 +176,8 @@ public void testCleanUpStateDirIfEmpty() throws InterruptedException { } @Test - public void testNotCleanUpStateDirIfNotEmpty() throws InterruptedException { - final String uniqueTestName = safeUniqueTestName(getClass(), testName); + public void testNotCleanUpStateDirIfNotEmpty(final TestInfo testInfo) throws InterruptedException { + final String uniqueTestName = safeUniqueTestName(getClass(), testInfo); // Create Topic final String input = uniqueTestName + "-input"; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StateRestorationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StateRestorationIntegrationTest.java index 4cedf82bfccb3..9fb6b39c2cc6d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StateRestorationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StateRestorationIntegrationTest.java @@ -33,13 +33,12 @@ import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.StreamsTestUtils; import org.apache.kafka.test.TestUtils; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.Timeout; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import java.io.IOException; import java.util.Arrays; @@ -47,11 +46,9 @@ import java.util.List; import java.util.Properties; +@Timeout(600) @Category({IntegrationTest.class}) public class StateRestorationIntegrationTest { - @Rule - public Timeout globalTimeout = Timeout.seconds(600); - private final StreamsBuilder builder = new StreamsBuilder(); private static final String APPLICATION_ID = "restoration-test-app"; @@ -63,19 +60,19 @@ public class StateRestorationIntegrationTest { public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); - @BeforeClass + @BeforeAll public static void startCluster() throws IOException { CLUSTER.start(); } - @AfterClass + @AfterAll public static void closeCluster() { CLUSTER.stop(); } private final MockTime mockTime = CLUSTER.time; - @Before + @BeforeEach public void setUp() throws Exception { final Properties props = new Properties(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java index 17df2b59a2176..c9646c7d87315 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java @@ -42,14 +42,13 @@ import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; import java.io.IOException; import java.time.Duration; @@ -70,28 +69,25 @@ public class StoreUpgradeIntegrationTest { public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); - @BeforeClass + @BeforeAll public static void startCluster() throws IOException { CLUSTER.start(); } - @AfterClass + @AfterAll public static void closeCluster() { CLUSTER.stop(); } - @Rule - public TestName testName = new TestName(); - - @Before - public void createTopics() throws Exception { - inputStream = "input-stream-" + safeUniqueTestName(getClass(), testName); + @BeforeEach + public void createTopics(final TestInfo testInfo) throws Exception { + inputStream = "input-stream-" + safeUniqueTestName(getClass(), testInfo); CLUSTER.createTopic(inputStream); } - private Properties props() { + private Properties props(final TestInfo testInfo) { final Properties streamsConfiguration = new Properties(); - final String safeTestName = safeUniqueTestName(getClass(), testName); + final String safeTestName = safeUniqueTestName(getClass(), testInfo); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); @@ -103,7 +99,7 @@ private Properties props() { return streamsConfiguration; } - @After + @AfterEach public void shutdown() { if (kafkaStreams != null) { kafkaStreams.close(Duration.ofSeconds(30L)); @@ -112,16 +108,17 @@ public void shutdown() { } @Test - public void shouldMigrateInMemoryKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws Exception { - shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(false); + public void shouldMigrateInMemoryKeyValueStoreToTimestampedKeyValueStoreUsingPapi(final TestInfo testInfo) throws Exception { + shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(false, testInfo); } @Test - public void shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws Exception { - shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(true); + public void shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi(final TestInfo testInfo) throws Exception { + shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(true, testInfo); } - private void shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(final boolean persistentStore) throws Exception { + private void shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(final boolean persistentStore, + final TestInfo testInfo) throws Exception { final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder(); streamsBuilderForOldStore.addStateStore( @@ -132,7 +129,7 @@ private void shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(final .stream(inputStream) .process(KeyValueProcessor::new, STORE_NAME); - final Properties props = props(); + final Properties props = props(testInfo); kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(), props); kafkaStreams.start(); @@ -231,7 +228,7 @@ private void shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(final } @Test - public void shouldProxyKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws Exception { + public void shouldProxyKeyValueStoreToTimestampedKeyValueStoreUsingPapi(final TestInfo testInfo) throws Exception { final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder(); streamsBuilderForOldStore.addStateStore( @@ -242,7 +239,7 @@ public void shouldProxyKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws .stream(inputStream) .process(KeyValueProcessor::new, STORE_NAME); - final Properties props = props(); + final Properties props = props(testInfo); kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(), props); kafkaStreams.start(); @@ -499,7 +496,7 @@ private void processKeyValueAndVerifyCountWithTimestamp(final K key, } @Test - public void shouldMigrateInMemoryWindowStoreToTimestampedWindowStoreUsingPapi() throws Exception { + public void shouldMigrateInMemoryWindowStoreToTimestampedWindowStoreUsingPapi(final TestInfo testInfo) throws Exception { final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder(); streamsBuilderForOldStore .addStateStore( @@ -532,11 +529,12 @@ public void shouldMigrateInMemoryWindowStoreToTimestampedWindowStoreUsingPapi() shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi( streamsBuilderForOldStore, streamsBuilderForNewStore, - false); + false, + testInfo); } @Test - public void shouldMigratePersistentWindowStoreToTimestampedWindowStoreUsingPapi() throws Exception { + public void shouldMigratePersistentWindowStoreToTimestampedWindowStoreUsingPapi(final TestInfo testInfo) throws Exception { final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder(); streamsBuilderForOldStore @@ -569,13 +567,15 @@ public void shouldMigratePersistentWindowStoreToTimestampedWindowStoreUsingPapi( shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi( streamsBuilderForOldStore, streamsBuilderForNewStore, - true); + true, + testInfo); } private void shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi(final StreamsBuilder streamsBuilderForOldStore, final StreamsBuilder streamsBuilderForNewStore, - final boolean persistentStore) throws Exception { - final Properties props = props(); + final boolean persistentStore, + final TestInfo testInfo) throws Exception { + final Properties props = props(testInfo); kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(), props); kafkaStreams.start(); @@ -711,7 +711,7 @@ private void shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi(final Str } @Test - public void shouldProxyWindowStoreToTimestampedWindowStoreUsingPapi() throws Exception { + public void shouldProxyWindowStoreToTimestampedWindowStoreUsingPapi(final TestInfo testInfo) throws Exception { final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder(); streamsBuilderForOldStore.addStateStore( @@ -726,7 +726,7 @@ public void shouldProxyWindowStoreToTimestampedWindowStoreUsingPapi() throws Exc .stream(inputStream) .process(WindowedProcessor::new, STORE_NAME); - final Properties props = props(); + final Properties props = props(testInfo); kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(), props); kafkaStreams.start(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java index c81ddcfa74a5e..7c010bab62466 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java @@ -41,15 +41,14 @@ import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.StreamsTestUtils; import org.apache.kafka.test.TestUtils; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.junit.rules.Timeout; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.Timeout; import java.io.IOException; import java.time.Duration; @@ -72,45 +71,51 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.fail; +import static org.junit.jupiter.api.Assertions.fail; +@Timeout(600) @Category(IntegrationTest.class) @SuppressWarnings("deprecation") //Need to call the old handler, will remove those calls when the old handler is removed public class StreamsUncaughtExceptionHandlerIntegrationTest { - @Rule - public Timeout globalTimeout = Timeout.seconds(600); - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), 0L, 0L); - - @BeforeClass - public static void startCluster() throws IOException { - CLUSTER.start(); - } - - @AfterClass - public static void closeCluster() { - CLUSTER.stop(); - } public static final Duration DEFAULT_DURATION = Duration.ofSeconds(30); private static final AtomicBoolean THROW_ERROR = new AtomicBoolean(true); private static final AtomicBoolean THROW_ILLEGAL_STATE_EXCEPTION = new AtomicBoolean(false); private static final AtomicBoolean THROW_ILLEGAL_ARGUMENT_EXCEPTION = new AtomicBoolean(false); - @Rule - public final TestName testName = new TestName(); - - private final String testId = safeUniqueTestName(getClass(), testName); - private final String appId = "appId_" + testId; - private final String inputTopic = "input" + testId; - private final String inputTopic2 = "input2" + testId; - private final String outputTopic = "output" + testId; - private final String outputTopic2 = "output2" + testId; + private final String testId; + private final String appId; + private final String inputTopic; + private final String inputTopic2; + private final String outputTopic; + private final String outputTopic2; private final StreamsBuilder builder = new StreamsBuilder(); private final List processorValueCollector = new ArrayList<>(); private final Properties properties = basicProps(); + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), 0L, 0L); + + StreamsUncaughtExceptionHandlerIntegrationTest(final TestInfo testInfo) { + testId = safeUniqueTestName(getClass(), testInfo); + appId = "appId_" + testId; + inputTopic = "input" + testId; + inputTopic2 = "input2" + testId; + outputTopic = "output" + testId; + outputTopic2 = "output2" + testId; + } + + @BeforeAll + public static void startCluster() throws IOException { + CLUSTER.start(); + } + + @AfterAll + public static void closeCluster() { + CLUSTER.stop(); + } + private Properties basicProps() { return mkObjectProperties( mkMap( @@ -125,14 +130,14 @@ private Properties basicProps() { ); } - @Before + @BeforeEach public void setup() { IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic, inputTopic2, outputTopic, outputTopic2); final KStream stream = builder.stream(inputTopic); stream.process(() -> new ShutdownProcessor(processorValueCollector), Named.as("process")); } - @After + @AfterEach public void teardown() throws IOException { purgeLocalStreamsState(properties); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUpgradeTestIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUpgradeTestIntegrationTest.java index 6a9453e9926b5..15019e6532187 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUpgradeTestIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUpgradeTestIntegrationTest.java @@ -22,12 +22,11 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.tests.StreamsUpgradeTest; import org.apache.kafka.test.IntegrationTest; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.Timeout; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import java.io.IOException; import java.time.Duration; @@ -44,20 +43,18 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +@Timeout(600) @Category(IntegrationTest.class) public class StreamsUpgradeTestIntegrationTest { - @Rule - public Timeout globalTimeout = Timeout.seconds(600); - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3); - @BeforeClass + @BeforeAll public static void startCluster() throws IOException { CLUSTER.start(); IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, 1, "data"); } - @AfterClass + @AfterAll public static void closeCluster() { CLUSTER.stop(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java index 35eee4764d487..5d0395d05a25a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java @@ -45,12 +45,11 @@ import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; import org.hamcrest.Matchers; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.Timeout; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import java.io.IOException; import java.util.Collections; @@ -81,23 +80,21 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; +@Timeout(600) @Category(IntegrationTest.class) public class SuppressionIntegrationTest { - @Rule - public Timeout globalTimeout = Timeout.seconds(600); - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster( 1, mkProperties(mkMap()), 0L ); - @BeforeClass + @BeforeAll public static void startCluster() throws IOException { CLUSTER.start(); } - @AfterClass + @AfterAll public static void closeCluster() { CLUSTER.stop(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java index a706d74d66e9e..99385158e1945 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java @@ -31,13 +31,12 @@ import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.junit.rules.Timeout; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.Timeout; import java.io.IOException; import java.lang.reflect.Field; @@ -55,32 +54,27 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.sameInstance; +@Timeout(600) @Category(IntegrationTest.class) public class TaskAssignorIntegrationTest { - @Rule - public Timeout globalTimeout = Timeout.seconds(600); - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); - @BeforeClass + @BeforeAll public static void startCluster() throws IOException { CLUSTER.start(); } - @AfterClass + @AfterAll public static void closeCluster() { CLUSTER.stop(); } - @Rule - public TestName testName = new TestName(); - // Just a dummy implementation so we can check the config public static final class MyTaskAssignor extends HighAvailabilityTaskAssignor implements TaskAssignor { } @SuppressWarnings("unchecked") @Test - public void shouldProperlyConfigureTheAssignor() throws NoSuchFieldException, IllegalAccessException { + public void shouldProperlyConfigureTheAssignor(final TestInfo testInfo) throws NoSuchFieldException, IllegalAccessException { // This test uses reflection to check and make sure that all the expected configurations really // make it all the way to configure the task assignor. There's no other use case for being able // to extract all these fields, so reflection is a good choice until we find that the maintenance @@ -90,7 +84,7 @@ public void shouldProperlyConfigureTheAssignor() throws NoSuchFieldException, Il // ensure these configurations wind up where they belong, and any number of future code changes // could break this change. - final String testId = safeUniqueTestName(getClass(), testName); + final String testId = safeUniqueTestName(getClass(), testInfo); final String appId = "appId_" + testId; final String inputTopic = "input" + testId; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java index 791ee58ff7bbd..4d3686d68091a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java @@ -32,15 +32,14 @@ import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.junit.rules.Timeout; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.Timeout; import java.io.IOException; import java.time.Duration; @@ -59,27 +58,22 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +@Timeout(600) @Category(IntegrationTest.class) public class TaskMetadataIntegrationTest { - @Rule - public Timeout globalTimeout = Timeout.seconds(600); - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), 0L, 0L); - @BeforeClass + @BeforeAll public static void startCluster() throws IOException { CLUSTER.start(); } - @AfterClass + @AfterAll public static void closeCluster() { CLUSTER.stop(); } public static final Duration DEFAULT_DURATION = Duration.ofSeconds(30); - @Rule - public TestName testName = new TestName(); - private String inputTopic; private static StreamsBuilder builder; private static Properties properties; @@ -88,9 +82,9 @@ public static void closeCluster() { private AtomicBoolean process; private AtomicBoolean commit; - @Before - public void setup() { - final String testId = safeUniqueTestName(getClass(), testName); + @BeforeEach + public void setup(final TestInfo testInfo) { + final String testId = safeUniqueTestName(getClass(), testInfo); appId = appIdPrefix + testId; inputTopic = "input" + testId; IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic); @@ -172,7 +166,7 @@ private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) throws Int return taskMetadataList.get().get(0); } - @After + @AfterEach public void teardown() throws IOException { purgeLocalStreamsState(properties); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index a5de09bbf6f38..6276572022183 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -58,6 +58,7 @@ import org.apache.kafka.streams.state.QueryableStoreType; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.TestInfo; import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,6 +67,7 @@ import java.io.File; import java.io.IOException; import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.nio.file.Paths; import java.time.Duration; import java.util.ArrayList; @@ -236,6 +238,16 @@ public static String safeUniqueTestName(final Class testClass, final TestName .replace('=', '_'); } + public static String safeUniqueTestName(final Class testClass, final TestInfo testInfo) { + return (testClass.getSimpleName() + testInfo.getTestMethod().map(Method::getName)) + .replace(':', '_') + .replace('.', '_') + .replace('[', '_') + .replace(']', '_') + .replace(' ', '_') + .replace('=', '_'); + } + /** * Removes local state stores. Useful to reset state in-between integration test runs. * diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java index 156b553455d47..dfbdf843f3761 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java @@ -31,12 +31,14 @@ import org.apache.kafka.streams.kstream.StreamJoined; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.MockApiProcessor; import org.apache.kafka.test.MockApiProcessorSupplier; import org.apache.kafka.test.MockValueJoiner; import org.apache.kafka.test.StreamsTestUtils; -import org.junit.BeforeClass; -import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import java.time.Duration; import java.time.Instant; @@ -48,8 +50,9 @@ import static java.time.Duration.ZERO; import static java.time.Duration.ofMillis; -import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +@Category(IntegrationTest.class) public class KStreamKStreamLeftJoinTest { private final static KeyValueTimestamp[] EMPTY = new KeyValueTimestamp[0]; @@ -58,7 +61,7 @@ public class KStreamKStreamLeftJoinTest { private final Consumed consumed = Consumed.with(Serdes.Integer(), Serdes.String()); private final static Properties PROPS = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); - @BeforeClass + @BeforeAll public static void beforeClass() { PROPS.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, 0L); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java index 8133e25ec4be9..26b37cf3a202f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java @@ -36,12 +36,14 @@ import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowBytesStoreSupplier; import org.apache.kafka.streams.test.TestRecord; +import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.MockApiProcessor; import org.apache.kafka.test.MockApiProcessorSupplier; import org.apache.kafka.test.MockValueJoiner; import org.apache.kafka.test.StreamsTestUtils; -import org.junit.BeforeClass; -import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import java.time.Duration; import java.time.Instant; @@ -53,15 +55,16 @@ import static java.time.Duration.ZERO; import static java.time.Duration.ofMillis; -import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +@Category(IntegrationTest.class) public class KStreamKStreamOuterJoinTest { private final String topic1 = "topic1"; private final String topic2 = "topic2"; private final Consumed consumed = Consumed.with(Serdes.Integer(), Serdes.String()); private final static Properties PROPS = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); - @BeforeClass + @BeforeAll public static void beforeClass() { PROPS.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, 0L); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java index 017cccf69d8ae..92cfdf1e24891 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java @@ -28,14 +28,13 @@ import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; import java.io.IOException; import java.util.Properties; @@ -56,36 +55,33 @@ public class HandlingSourceTopicDeletionIntegrationTest { public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); - @BeforeClass + @BeforeAll public static void startCluster() throws IOException { CLUSTER.start(); } - @AfterClass + @AfterAll public static void closeCluster() { CLUSTER.stop(); } - @Rule - public TestName testName = new TestName(); - - @Before + @BeforeEach public void before() throws InterruptedException { CLUSTER.createTopics(INPUT_TOPIC, OUTPUT_TOPIC); } - @After + @AfterEach public void after() throws InterruptedException { CLUSTER.deleteTopics(INPUT_TOPIC, OUTPUT_TOPIC); } @Test - public void shouldThrowErrorAfterSourceTopicDeleted() throws InterruptedException { + public void shouldThrowErrorAfterSourceTopicDeleted(final TestInfo testInfo) throws InterruptedException { final StreamsBuilder builder = new StreamsBuilder(); builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.String())) .to(OUTPUT_TOPIC, Produced.with(Serdes.Integer(), Serdes.String())); - final String safeTestName = safeUniqueTestName(getClass(), testName); + final String safeTestName = safeUniqueTestName(getClass(), testInfo); final String appId = "app-" + safeTestName; final Properties streamsConfiguration = new Properties();