diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java index bf3889cb0f987..dc10f7a0a5785 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.integration; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; @@ -89,6 +90,10 @@ SmokeTestDriver.VerificationResult result() { } + // In this test, we try to keep creating new stream, and closing the old one, to maintain only 3 streams alive. + // During the new stream added and old stream left, the stream process should still complete without issue. + // We set 2 timeout condition to fail the test before passing the verification: + // (1) 6 min timeout, (2) 30 tries of polling without getting any data @Test public void shouldWorkWithRebalance() throws InterruptedException { Exit.setExitProcedure((statusCode, message) -> { @@ -110,6 +115,8 @@ public void shouldWorkWithRebalance() throws InterruptedException { final Properties props = new Properties(); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + // decrease the session timeout so that we can trigger the rebalance soon after old client left closed + props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000); // cycle out Streams instances as long as the test is running. while (driver.isAlive()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java index 2e53d580d5292..e39b3c0a741b0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java @@ -126,13 +126,14 @@ public void start(final Properties streamsProperties) { try { if (!countDownLatch.await(1, TimeUnit.MINUTES)) { System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute"); + } else { + System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED"); + System.out.println(name + " started at " + Instant.now()); } } catch (final InterruptedException e) { System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e); e.printStackTrace(System.out); } - System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED"); - System.out.println(name + " started at " + Instant.now()); } public void closeAsync() { @@ -145,7 +146,7 @@ public void close() { if (wasClosed && !uncaughtException) { System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED"); } else if (wasClosed) { - System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION"); + System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Got an uncaught exception"); } else { System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't close in time."); }