Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,44 +41,37 @@
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;


import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;

import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertTrue;

@Timeout(600)
@Category(IntegrationTest.class)
public class StateDirectoryIntegrationTest {
@Rule
public Timeout globalTimeout = Timeout.seconds(600);

public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);

@BeforeClass
@BeforeAll
public static void startCluster() throws IOException {
CLUSTER.start();
}

@AfterClass
@AfterAll
public static void closeCluster() {
CLUSTER.stop();
}

@Rule
public TestName testName = new TestName();

@Test
public void testCleanUpStateDirIfEmpty() throws InterruptedException {
final String uniqueTestName = safeUniqueTestName(getClass(), testName);
public void testCleanUpStateDirIfEmpty(final TestInfo testInfo) throws InterruptedException {
final String uniqueTestName = safeUniqueTestName(getClass(), testInfo);

// Create Topic
final String input = uniqueTestName + "-input";
Expand Down Expand Up @@ -183,8 +176,8 @@ public void testCleanUpStateDirIfEmpty() throws InterruptedException {
}

@Test
public void testNotCleanUpStateDirIfNotEmpty() throws InterruptedException {
final String uniqueTestName = safeUniqueTestName(getClass(), testName);
public void testNotCleanUpStateDirIfNotEmpty(final TestInfo testInfo) throws InterruptedException {
final String uniqueTestName = safeUniqueTestName(getClass(), testInfo);

// Create Topic
final String input = uniqueTestName + "-input";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,22 @@
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.Timeout;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

@Timeout(600)
@Category({IntegrationTest.class})
public class StateRestorationIntegrationTest {
@Rule
public Timeout globalTimeout = Timeout.seconds(600);

private final StreamsBuilder builder = new StreamsBuilder();

private static final String APPLICATION_ID = "restoration-test-app";
Expand All @@ -63,19 +60,19 @@ public class StateRestorationIntegrationTest {

public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);

@BeforeClass
@BeforeAll
public static void startCluster() throws IOException {
CLUSTER.start();
}

@AfterClass
@AfterAll
public static void closeCluster() {
CLUSTER.stop();
}

private final MockTime mockTime = CLUSTER.time;

@Before
@BeforeEach
public void setUp() throws Exception {
final Properties props = new Properties();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,13 @@
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;

import java.io.IOException;
import java.time.Duration;
Expand All @@ -70,28 +69,25 @@ public class StoreUpgradeIntegrationTest {

public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);

@BeforeClass
@BeforeAll
public static void startCluster() throws IOException {
CLUSTER.start();
}

@AfterClass
@AfterAll
public static void closeCluster() {
CLUSTER.stop();
}

@Rule
public TestName testName = new TestName();

@Before
public void createTopics() throws Exception {
inputStream = "input-stream-" + safeUniqueTestName(getClass(), testName);
@BeforeEach
public void createTopics(final TestInfo testInfo) throws Exception {
inputStream = "input-stream-" + safeUniqueTestName(getClass(), testInfo);
CLUSTER.createTopic(inputStream);
}

private Properties props() {
private Properties props(final TestInfo testInfo) {
final Properties streamsConfiguration = new Properties();
final String safeTestName = safeUniqueTestName(getClass(), testName);
final String safeTestName = safeUniqueTestName(getClass(), testInfo);
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
Expand All @@ -103,7 +99,7 @@ private Properties props() {
return streamsConfiguration;
}

@After
@AfterEach
public void shutdown() {
if (kafkaStreams != null) {
kafkaStreams.close(Duration.ofSeconds(30L));
Expand All @@ -112,16 +108,17 @@ public void shutdown() {
}

@Test
public void shouldMigrateInMemoryKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws Exception {
shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(false);
public void shouldMigrateInMemoryKeyValueStoreToTimestampedKeyValueStoreUsingPapi(final TestInfo testInfo) throws Exception {
shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(false, testInfo);
}

@Test
public void shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws Exception {
shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(true);
public void shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi(final TestInfo testInfo) throws Exception {
shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(true, testInfo);
}

private void shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(final boolean persistentStore) throws Exception {
private void shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(final boolean persistentStore,
final TestInfo testInfo) throws Exception {
final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder();

streamsBuilderForOldStore.addStateStore(
Expand All @@ -132,7 +129,7 @@ private void shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(final
.<Integer, Integer>stream(inputStream)
.process(KeyValueProcessor::new, STORE_NAME);

final Properties props = props();
final Properties props = props(testInfo);
kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(), props);
kafkaStreams.start();

Expand Down Expand Up @@ -231,7 +228,7 @@ private void shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(final
}

@Test
public void shouldProxyKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws Exception {
public void shouldProxyKeyValueStoreToTimestampedKeyValueStoreUsingPapi(final TestInfo testInfo) throws Exception {
final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder();

streamsBuilderForOldStore.addStateStore(
Expand All @@ -242,7 +239,7 @@ public void shouldProxyKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws
.<Integer, Integer>stream(inputStream)
.process(KeyValueProcessor::new, STORE_NAME);

final Properties props = props();
final Properties props = props(testInfo);
kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(), props);
kafkaStreams.start();

Expand Down Expand Up @@ -499,7 +496,7 @@ private <K, V> void processKeyValueAndVerifyCountWithTimestamp(final K key,
}

@Test
public void shouldMigrateInMemoryWindowStoreToTimestampedWindowStoreUsingPapi() throws Exception {
public void shouldMigrateInMemoryWindowStoreToTimestampedWindowStoreUsingPapi(final TestInfo testInfo) throws Exception {
final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder();
streamsBuilderForOldStore
.addStateStore(
Expand Down Expand Up @@ -532,11 +529,12 @@ public void shouldMigrateInMemoryWindowStoreToTimestampedWindowStoreUsingPapi()
shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi(
streamsBuilderForOldStore,
streamsBuilderForNewStore,
false);
false,
testInfo);
}

@Test
public void shouldMigratePersistentWindowStoreToTimestampedWindowStoreUsingPapi() throws Exception {
public void shouldMigratePersistentWindowStoreToTimestampedWindowStoreUsingPapi(final TestInfo testInfo) throws Exception {
final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder();

streamsBuilderForOldStore
Expand Down Expand Up @@ -569,13 +567,15 @@ public void shouldMigratePersistentWindowStoreToTimestampedWindowStoreUsingPapi(
shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi(
streamsBuilderForOldStore,
streamsBuilderForNewStore,
true);
true,
testInfo);
}

private void shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi(final StreamsBuilder streamsBuilderForOldStore,
final StreamsBuilder streamsBuilderForNewStore,
final boolean persistentStore) throws Exception {
final Properties props = props();
final boolean persistentStore,
final TestInfo testInfo) throws Exception {
final Properties props = props(testInfo);
kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(), props);
kafkaStreams.start();

Expand Down Expand Up @@ -711,7 +711,7 @@ private void shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi(final Str
}

@Test
public void shouldProxyWindowStoreToTimestampedWindowStoreUsingPapi() throws Exception {
public void shouldProxyWindowStoreToTimestampedWindowStoreUsingPapi(final TestInfo testInfo) throws Exception {
final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder();

streamsBuilderForOldStore.addStateStore(
Expand All @@ -726,7 +726,7 @@ public void shouldProxyWindowStoreToTimestampedWindowStoreUsingPapi() throws Exc
.<Integer, Integer>stream(inputStream)
.process(WindowedProcessor::new, STORE_NAME);

final Properties props = props();
final Properties props = props(testInfo);
kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(), props);
kafkaStreams.start();

Expand Down
Loading