diff --git a/examples/kafka-cluster/build.gradle b/examples/kafka-cluster/build.gradle index 54ae4eee8ff..623fcd66175 100644 --- a/examples/kafka-cluster/build.gradle +++ b/examples/kafka-cluster/build.gradle @@ -15,6 +15,7 @@ dependencies { testImplementation 'com.google.guava:guava:23.0' testImplementation 'ch.qos.logback:logback-classic:1.3.14' testImplementation 'org.junit.jupiter:junit-jupiter:5.11.0' + testImplementation 'org.awaitility:awaitility:4.2.2' } test { diff --git a/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerCluster.java b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerCluster.java index 9ab3f25e0ff..403d087af3e 100644 --- a/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerCluster.java +++ b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerCluster.java @@ -1,7 +1,7 @@ package com.example.kafkacluster; import lombok.SneakyThrows; -import org.rnorth.ducttape.unreliables.Unreliables; +import org.awaitility.Awaitility; import org.testcontainers.containers.Container; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.KafkaContainer; @@ -11,11 +11,12 @@ import java.time.Duration; import java.util.Collection; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; +import static org.assertj.core.api.Assertions.assertThat; + /** * Provides an easy way to launch a Kafka cluster with multiple brokers. */ @@ -87,10 +88,10 @@ public void start() { // sequential start to avoid resource contention on CI systems with weaker hardware brokers.forEach(GenericContainer::start); - Unreliables.retryUntilTrue( - 30, - TimeUnit.SECONDS, - () -> { + Awaitility + .await() + .atMost(Duration.ofSeconds(30)) + .untilAsserted(() -> { Container.ExecResult result = this.zookeeper.execInContainer( "sh", @@ -101,9 +102,8 @@ public void start() { ); String brokers = result.getStdout(); - return brokers != null && brokers.split(",").length == this.brokersNum; - } - ); + assertThat(brokers.split(",")).hasSize(this.brokersNum); + }); } @Override diff --git a/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerClusterTest.java b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerClusterTest.java index 51221b5d56f..6a66cf0ca2e 100644 --- a/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerClusterTest.java +++ b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerClusterTest.java @@ -13,8 +13,8 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; -import org.rnorth.ducttape.unreliables.Unreliables; import java.time.Duration; import java.util.Collection; @@ -100,24 +100,17 @@ protected void testKafkaFunctionality(String bootstrapServers, int partitions, i producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get(); - Unreliables.retryUntilTrue( - 10, - TimeUnit.SECONDS, - () -> { + Awaitility + .await() + .atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - if (records.isEmpty()) { - return false; - } - assertThat(records) .hasSize(1) .extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value) .containsExactly(tuple(topicName, "testcontainers", "rulezzz")); - - return true; - } - ); + }); consumer.unsubscribe(); } diff --git a/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerKraftCluster.java b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerKraftCluster.java index 4411cb14540..f42e3c5b1ba 100644 --- a/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerKraftCluster.java +++ b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerKraftCluster.java @@ -1,7 +1,7 @@ package com.example.kafkacluster; import org.apache.kafka.common.Uuid; -import org.rnorth.ducttape.unreliables.Unreliables; +import org.awaitility.Awaitility; import org.testcontainers.containers.Container; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.KafkaContainer; @@ -11,10 +11,11 @@ import java.time.Duration; import java.util.Collection; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.assertj.core.api.Assertions.assertThat; + public class KafkaContainerKraftCluster implements Startable { private final int brokersNum; @@ -79,10 +80,10 @@ public void start() { // Needs to start all the brokers at once brokers.parallelStream().forEach(GenericContainer::start); - Unreliables.retryUntilTrue( - 30, - TimeUnit.SECONDS, - () -> { + Awaitility + .await() + .atMost(Duration.ofSeconds(30)) + .untilAsserted(() -> { Container.ExecResult result = this.brokers.stream() .findFirst() @@ -94,9 +95,8 @@ public void start() { ); String brokers = result.getStdout().replace("\n", ""); - return brokers != null && Integer.valueOf(brokers) == this.brokersNum; - } - ); + assertThat(brokers).asInt().isEqualTo(this.brokersNum); + }); } @Override