Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,13 @@
import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.Timeout;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -62,12 +61,11 @@
import static java.time.Duration.ofSeconds;
import static java.util.Arrays.asList;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.junit.Assert.assertEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;

@Timeout(600)
@Category({IntegrationTest.class})
public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
@Rule
public Timeout globalTimeout = Timeout.seconds(600);
private final static int NUM_BROKERS = 1;

public final static EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
Expand All @@ -88,7 +86,7 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
private final static Properties PRODUCER_CONFIG_2 = new Properties();
private final static Properties PRODUCER_CONFIG_3 = new Properties();

@BeforeClass
@BeforeAll
public static void startCluster() throws IOException, InterruptedException {
CLUSTER.start();
//Use multiple partitions to ensure distribution of keys.
Expand Down Expand Up @@ -149,20 +147,20 @@ public static void startCluster() throws IOException, InterruptedException {
CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
}

@AfterClass
@AfterAll
public static void closeCluster() {
CLUSTER.stop();
}

@Before
@BeforeEach
public void before() throws IOException {
final String stateDirBasePath = TestUtils.tempDirectory().getPath();
streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-1");
streamsConfigTwo.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-2");
streamsConfigThree.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-3");
}

@After
@AfterEach
public void after() throws IOException {
if (streams != null) {
streams.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,14 @@
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -55,35 +54,30 @@

import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.junit.Assert.assertEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;

@Timeout(600)
@Category({IntegrationTest.class})
public class KTableKTableForeignKeyJoinDistributedTest {
@Rule
public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
private static final String LEFT_TABLE = "left_table";
private static final String RIGHT_TABLE = "right_table";
private static final String OUTPUT = "output-topic";
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);

@BeforeClass
@BeforeAll
public static void startCluster() throws IOException, InterruptedException {
CLUSTER.start();
CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
}

@AfterClass
@AfterAll
public static void closeCluster() {
CLUSTER.stop();
}

private static final Properties CONSUMER_CONFIG = new Properties();

@Rule
public TestName testName = new TestName();


private static final String INPUT_TOPIC = "input-topic";

private KafkaStreams client1;
Expand All @@ -92,7 +86,7 @@ public static void closeCluster() {
private volatile boolean client1IsOk = false;
private volatile boolean client2IsOk = false;

@Before
@BeforeEach
public void setupTopics() throws InterruptedException {
CLUSTER.createTopic(LEFT_TABLE, 1, 1);
CLUSTER.createTopic(RIGHT_TABLE, 1, 1);
Expand Down Expand Up @@ -125,16 +119,16 @@ public void setupTopics() throws InterruptedException {
CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
}

@After
@AfterEach
public void after() {
client1.close();
client2.close();
quietlyCleanStateAfterTest(CLUSTER, client1);
quietlyCleanStateAfterTest(CLUSTER, client2);
}

public Properties getStreamsConfiguration() {
final String safeTestName = safeUniqueTestName(getClass(), testName);
public Properties getStreamsConfiguration(final TestInfo testInfo) {
final String safeTestName = safeUniqueTestName(getClass(), testInfo);
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
Expand Down Expand Up @@ -164,9 +158,9 @@ private void configureBuilder(final StreamsBuilder builder) {
}

@Test
public void shouldBeInitializedWithDefaultSerde() throws Exception {
final Properties streamsConfiguration1 = getStreamsConfiguration();
final Properties streamsConfiguration2 = getStreamsConfiguration();
public void shouldBeInitializedWithDefaultSerde(final TestInfo testInfo) throws Exception {
final Properties streamsConfiguration1 = getStreamsConfiguration(testInfo);
final Properties streamsConfiguration2 = getStreamsConfiguration(testInfo);

//Each streams client needs to have it's own StreamsBuilder in order to simulate
//a truly distributed run
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,14 @@
import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -73,20 +72,19 @@
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertTrue;

@Timeout(600)
@Category({IntegrationTest.class})
public class LagFetchIntegrationTest {
@Rule
public Timeout globalTimeout = Timeout.seconds(600);
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);

@BeforeClass
@BeforeAll
public static void startCluster() throws IOException {
CLUSTER.start();
}

@AfterClass
@AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
Expand All @@ -102,12 +100,9 @@ public static void closeCluster() {
private String outputTopicName;
private String stateStoreName;

@Rule
public TestName testName = new TestName();

@Before
public void before() {
final String safeTestName = safeUniqueTestName(getClass(), testName);
@BeforeEach
public void before(final TestInfo testInfo) {
final String safeTestName = safeUniqueTestName(getClass(), testInfo);
inputTopicName = "input-topic-" + safeTestName;
outputTopicName = "output-topic-" + safeTestName;
stateStoreName = "lagfetch-test-store" + safeTestName;
Expand All @@ -128,7 +123,7 @@ public void before() {
consumerConfiguration.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
}

@After
@AfterEach
public void shutdown() throws Exception {
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
}
Expand Down Expand Up @@ -318,7 +313,7 @@ public void shouldFetchLagsDuringRestoration() throws Exception {
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
Files.walk(stateDir.toPath()).sorted(Comparator.reverseOrder())
.map(Path::toFile)
.forEach(f -> assertTrue("Some state " + f + " could not be deleted", f.delete()));
.forEach(f -> assertTrue(f.delete(), "Some state " + f + " could not be deleted"));

// wait till the lag goes down to 0
final KafkaStreams restartedStreams = new KafkaStreams(builder.build(), props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,14 @@
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;

import java.io.IOException;
import java.time.Duration;
Expand All @@ -65,23 +63,24 @@
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

@Timeout(600)
@Category({IntegrationTest.class})
@SuppressWarnings("deprecation")
public class MetricsIntegrationTest {
@Rule
public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
private static final int NUM_THREADS = 2;

public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);

@BeforeClass
@BeforeAll
public static void startCluster() throws IOException {
CLUSTER.start();
}

@AfterClass
@AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
Expand Down Expand Up @@ -242,15 +241,12 @@ public static void closeCluster() {

private String appId;

@Rule
public TestName testName = new TestName();

@Before
public void before() throws InterruptedException {
@BeforeEach
public void before(final TestInfo testInfo) throws InterruptedException {
builder = new StreamsBuilder();
CLUSTER.createTopics(STREAM_INPUT, STREAM_OUTPUT_1, STREAM_OUTPUT_2, STREAM_OUTPUT_3, STREAM_OUTPUT_4);

final String safeTestName = safeUniqueTestName(getClass(), testName);
final String safeTestName = safeUniqueTestName(getClass(), testInfo);
appId = "app-" + safeTestName;

streamsConfiguration = new Properties();
Expand All @@ -263,7 +259,7 @@ public void before() throws InterruptedException {
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
}

@After
@AfterEach
public void after() throws InterruptedException {
CLUSTER.deleteTopics(STREAM_INPUT, STREAM_OUTPUT_1, STREAM_OUTPUT_2, STREAM_OUTPUT_3, STREAM_OUTPUT_4);
}
Expand Down Expand Up @@ -752,9 +748,9 @@ private void checkMetricByName(final List<Metric> listMetric, final String metri
final List<Metric> metrics = listMetric.stream()
.filter(m -> m.metricName().name().equals(metricName))
.collect(Collectors.toList());
Assert.assertEquals("Size of metrics of type:'" + metricName + "' must be equal to " + numMetric + " but it's equal to " + metrics.size(), numMetric, metrics.size());
assertEquals(numMetric, metrics.size(), "Size of metrics of type:'" + metricName + "' must be equal to " + numMetric + " but it's equal to " + metrics.size());
for (final Metric m : metrics) {
Assert.assertNotNull("Metric:'" + m.metricName() + "' must be not null", m.metricValue());
assertNotNull(m.metricValue(), "Metric:'" + m.metricName() + "' must be not null");
}
}
}
Loading