Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.integration;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
Expand Down Expand Up @@ -89,6 +90,10 @@ SmokeTestDriver.VerificationResult result() {

}

// In this test, we try to keep creating new stream, and closing the old one, to maintain only 3 streams alive.
// During the new stream added and old stream left, the stream process should still complete without issue.
// We set 2 timeout condition to fail the test before passing the verification:
// (1) 6 min timeout, (2) 30 tries of polling without getting any data
@Test
public void shouldWorkWithRebalance() throws InterruptedException {
Exit.setExitProcedure((statusCode, message) -> {
Expand All @@ -110,6 +115,8 @@ public void shouldWorkWithRebalance() throws InterruptedException {

final Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// decrease the session timeout so that we can trigger the rebalance soon after old client left closed
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);

// cycle out Streams instances as long as the test is running.
while (driver.isAlive()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,14 @@ public void start(final Properties streamsProperties) {
try {
if (!countDownLatch.await(1, TimeUnit.MINUTES)) {
System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute");
} else {
System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");
System.out.println(name + " started at " + Instant.now());
}
} catch (final InterruptedException e) {
System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e);
e.printStackTrace(System.out);
}
System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");
System.out.println(name + " started at " + Instant.now());
}

public void closeAsync() {
Expand All @@ -145,7 +146,7 @@ public void close() {
if (wasClosed && !uncaughtException) {
System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED");
} else if (wasClosed) {
System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Got an uncaught exception");
} else {
System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't close in time.");
}
Expand Down