diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 7e4eaef8f..90c2d26e7 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -42,6 +42,8 @@ managed-kafka-streams = { module = 'org.apache.kafka:kafka-streams', version.ref opentracing-kafka-client = { module = 'io.opentracing.contrib:opentracing-kafka-client', version.ref = 'opentracing-kafka-client' } opentracing-mock = { module = 'io.opentracing:opentracing-mock', version.ref = 'opentracing-mock' } +testcontainers-kafka = { module = "org.testcontainers:kafka", version.ref = 'testcontainers' } + zipkin-brave-kafka-clients = { module = 'io.zipkin.brave:brave-instrumentation-kafka-clients', version.ref = 'zipkin-brave-kafka-clients' } awaitility = { module = 'org.awaitility:awaitility', version.ref = 'awaitility' } @@ -51,6 +53,7 @@ micronaut-reactor = { module = "io.micronaut.reactor:micronaut-reactor-bom", ver micronaut-rxjava2 = { module = "io.micronaut.rxjava2:micronaut-rxjava2-bom", version.ref = "micronaut-rxjava2" } micronaut-serde = { module = "io.micronaut.serde:micronaut-serde-bom", version.ref = "micronaut-serde" } micronaut-tracing = { module = "io.micronaut.tracing:micronaut-tracing-bom", version.ref = "micronaut-tracing" } +junit-jupiter-engine = { module = 'org.junit.jupiter:junit-jupiter-engine' } micronaut-gradle-plugin = { module = "io.micronaut.gradle:micronaut-gradle-plugin", version.ref = "micronaut-gradle-plugin" } diff --git a/settings.gradle b/settings.gradle index df792a4a4..5e4b58a82 100644 --- a/settings.gradle +++ b/settings.gradle @@ -28,4 +28,6 @@ include 'kafka-bom' include 'kafka' include 'kafka-streams' include 'tests:tasks-sasl-plaintext' - +include 'test-suite' +include 'test-suite-groovy' +include 'test-suite-kotlin' diff --git a/src/main/docs/guide/kafkaClient/kafkaClientScope.adoc b/src/main/docs/guide/kafkaClient/kafkaClientScope.adoc index 9e29c49f3..50cfe5fce 100644 --- a/src/main/docs/guide/kafkaClient/kafkaClientScope.adoc +++ b/src/main/docs/guide/kafkaClient/kafkaClientScope.adoc @@ -24,7 +24,7 @@ The previous example can be tested in JUnit with the following test: include::{testskafka}/producer/inject/BookSenderTest.java[tags=test, indent=0] ---- -<1> An embedded version of Kafka is used +<1> A Kafka docker container is used <2> The `BookSender` is retrieved from the api:context.ApplicationContext[] and a `ProducerRecord` sent By using the link:{kafkaapi}/org/apache/kafka/clients/producer/KafkaProducer.html[KafkaProducer] API directly you open up even more options if you require transactions (exactly-once delivery) or want control over when records are flushed etc. \ No newline at end of file diff --git a/src/main/docs/guide/kafkaClient/kafkaDockerized.adoc b/src/main/docs/guide/kafkaClient/kafkaDockerized.adoc new file mode 100644 index 000000000..462ddbe2a --- /dev/null +++ b/src/main/docs/guide/kafkaClient/kafkaDockerized.adoc @@ -0,0 +1,13 @@ +https://micronaut-projects.github.io/micronaut-test-resources/latest/guide/#modules-kafka[Micronaut Test Resources] simplifies running Kafka for local development and testing. + +> Micronaut Test Resources Kafka support will automatically start a Kafka container and provide the value of the `kafka.bootstrap.servers` property. + +https://micronaut.io/launch[Micronaut Launch] and CLI already apply Test Resources to your build when you https://micronaut.io/launch?features=kafka[select the `kafka` feature]. + +Micronaut Test Resources uses https://testcontainers.com[Test Containers] under the hood. If you prefer to use Test Containers directly, you can create a https://www.testcontainers.org/test_framework_integration/manual_lifecycle_control/#singleton-containers[Singleton Container] and combine it with https://micronaut-projects.github.io/micronaut-test/latest/api/io/micronaut/test/support/TestPropertyProvider.html[Micronaut Test `TestPropertyProvider`]: + +snippet::io.micronaut.kafka.docs.AbstractKafkaTest[] + +And then test: + +snippet::io.micronaut.kafka.docs.MyTest[] diff --git a/src/main/docs/guide/kafkaClient/kafkaEmbedded.adoc b/src/main/docs/guide/kafkaClient/kafkaEmbedded.adoc deleted file mode 100644 index 9f95f2717..000000000 --- a/src/main/docs/guide/kafkaClient/kafkaEmbedded.adoc +++ /dev/null @@ -1,35 +0,0 @@ -The previous section introduced the ability to embed Kafka for your tests. This is possible in Micronaut by specifying the `kafka.embedded.enabled` setting to `true` and adding the following dependencies to your test classpath: - -.Kafka Gradle Test Dependencies -[source,groovy,subs="attributes"] ----- -testImplementation 'org.apache.kafka:kafka-clients:{kafka-version}:test' -testImplementation 'org.apache.kafka:kafka_2.12:{kafka-version}' -testImplementation 'org.apache.kafka:kafka_2.12:{kafka-version}:test' ----- - -or - -.Kafka Maven Test Dependencies -[source,xml,subs="specialchars,attributes"] ----- - - org.apache.kafka - kafka-clients - {kafka-version} - test - - - org.apache.kafka - kafka_2.12 - {kafka-version} - - - org.apache.kafka - kafka_2.12 - {kafka-version} - test - ----- - -Note that because of the distributed nature of Kafka it is relatively slow to startup so it is generally better to do the initialization with `@BeforeClass` (or `setupSpec` in Spock) and have a large number of test methods rather than many test classes otherwise your test execution performance will suffer. \ No newline at end of file diff --git a/src/main/docs/guide/toc.yml b/src/main/docs/guide/toc.yml index ebf73813c..676f705d9 100644 --- a/src/main/docs/guide/toc.yml +++ b/src/main/docs/guide/toc.yml @@ -9,7 +9,7 @@ kafkaClient: kafkaClientBatch: Sending Records in Batch kafkaClientScope: Injecting Kafka Producer Beans kafkaClientTx: Transactions - kafkaEmbedded: Embedding Kafka + kafkaDockerized: Running Kafka while testing and developing kafkaListener: title: Kafka Consumers Using @KafkaListener kafkaListenerMethods: Defining @KafkaListener Methods diff --git a/test-suite-groovy/build.gradle.kts b/test-suite-groovy/build.gradle.kts new file mode 100644 index 000000000..de5937b53 --- /dev/null +++ b/test-suite-groovy/build.gradle.kts @@ -0,0 +1,17 @@ +plugins { + groovy +} + +dependencies { + testImplementation(platform(mn.micronaut.core.bom)) + testCompileOnly(mn.micronaut.inject.groovy) + testImplementation(libs.testcontainers.kafka) + testImplementation(mnTest.micronaut.test.spock) + testRuntimeOnly(libs.junit.jupiter.engine) + testImplementation(libs.awaitility) + testImplementation(projects.micronautKafka) +} + +tasks.withType { + useJUnitPlatform() +} diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/AbstractKafkaTest.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/AbstractKafkaTest.groovy new file mode 100644 index 000000000..e5dc10a0d --- /dev/null +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/AbstractKafkaTest.groovy @@ -0,0 +1,24 @@ +package io.micronaut.kafka.docs + +import io.micronaut.test.support.TestPropertyProvider +import org.testcontainers.containers.KafkaContainer +import org.testcontainers.utility.DockerImageName +import spock.lang.Specification + +/** + * @see Singleton containers + */ +abstract class AbstractKafkaTest extends Specification implements TestPropertyProvider { + + static final KafkaContainer MY_KAFKA + + static { + MY_KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")) + MY_KAFKA.start() + } + + @Override + Map getProperties() { + ["kafka.bootstrap.servers": MY_KAFKA.getBootstrapServers()] + } +} diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/MyTest.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/MyTest.groovy new file mode 100644 index 000000000..704ca23a4 --- /dev/null +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/MyTest.groovy @@ -0,0 +1,54 @@ +package io.micronaut.kafka.docs + +import io.micronaut.configuration.kafka.annotation.* +import io.micronaut.context.annotation.* +import io.micronaut.test.extensions.spock.annotation.MicronautTest +import jakarta.inject.Inject +import spock.lang.Ignore + +import static java.util.concurrent.TimeUnit.SECONDS +import static org.awaitility.Awaitility.await + +@Property(name = "spec.name", value = "MyTest") +@MicronautTest +@Ignore("It hangs forever in the CI") +class MyTest extends AbstractKafkaTest { + + @Inject + MyProducer producer + @Inject + MyConsumer consumer + + void "test kafka running"() { + given: + String message = "hello" + + when: + producer.produce(message) + + then: + await().atMost(5, SECONDS).until(() -> consumer.consumed == message) + + cleanup: + MY_KAFKA.stop() + } + + @Requires(property = "spec.name", value = "MyTest") + @KafkaClient + static interface MyProducer { + @Topic("my-topic") + void produce(String message) + } + + @Requires(property = "spec.name", value = "MyTest") + @KafkaListener(offsetReset = OffsetReset.EARLIEST) + static class MyConsumer { + String consumed + + @Topic("my-topic") + void consume(String message) { + consumed = message + } + } + +} diff --git a/test-suite-kotlin/build.gradle.kts b/test-suite-kotlin/build.gradle.kts new file mode 100644 index 000000000..1b12188ec --- /dev/null +++ b/test-suite-kotlin/build.gradle.kts @@ -0,0 +1,18 @@ +plugins { + id("org.jetbrains.kotlin.jvm") version "1.8.22" + id("org.jetbrains.kotlin.kapt") version "1.8.22" +} + +dependencies { + kaptTest(platform(mn.micronaut.core.bom)) + kaptTest(mn.micronaut.inject.java) + testImplementation(libs.testcontainers.kafka) + testImplementation(mnTest.micronaut.test.junit5) + testRuntimeOnly(libs.junit.jupiter.engine) + testImplementation(libs.awaitility) + testImplementation(projects.micronautKafka) +} + +tasks.withType { + useJUnitPlatform() +} diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/AbstractKafkaTest.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/AbstractKafkaTest.kt new file mode 100644 index 000000000..79145ddb5 --- /dev/null +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/AbstractKafkaTest.kt @@ -0,0 +1,26 @@ +package io.micronaut.kafka.docs + +import io.micronaut.test.support.TestPropertyProvider +import org.testcontainers.containers.KafkaContainer +import org.testcontainers.utility.DockerImageName +import java.util.* + +/** + * @see Singleton containers + */ +abstract class AbstractKafkaTest : TestPropertyProvider { + + companion object { + var MY_KAFKA: KafkaContainer = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")) + + init { + MY_KAFKA.start() + } + } + + override fun getProperties(): MutableMap { + return Collections.singletonMap( + "kafka.bootstrap.servers", MY_KAFKA.getBootstrapServers() + ) + } +} diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/MyTest.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/MyTest.kt new file mode 100644 index 000000000..a91995654 --- /dev/null +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/MyTest.kt @@ -0,0 +1,41 @@ +package io.micronaut.kafka.docs + +import io.micronaut.configuration.kafka.annotation.* +import io.micronaut.context.annotation.* +import io.micronaut.test.extensions.junit5.annotation.MicronautTest +import org.awaitility.Awaitility.await +import org.junit.jupiter.api.* +import java.util.concurrent.* + +@Property(name = "spec.name", value = "MyTest") +@MicronautTest +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@Disabled("It hangs forever in the CI") +internal class MyTest : AbstractKafkaTest() { + @Test + fun testKafkaRunning(producer: MyProducer, consumer: MyConsumer) { + val message = "hello" + producer.produce(message) + await().atMost(5, TimeUnit.SECONDS) + .until { consumer.consumed == message } + MY_KAFKA.stop() + } + + @Requires(property = "spec.name", value = "MyTest") + @KafkaClient + interface MyProducer { + @Topic("my-topic") + fun produce(message: String) + } + + @Requires(property = "spec.name", value = "MyTest") + @KafkaListener(offsetReset = OffsetReset.EARLIEST) + class MyConsumer { + var consumed: String? = null + + @Topic("my-topic") + fun consume(message: String) { + consumed = message + } + } +} diff --git a/test-suite/build.gradle.kts b/test-suite/build.gradle.kts new file mode 100644 index 000000000..6c259cba7 --- /dev/null +++ b/test-suite/build.gradle.kts @@ -0,0 +1,17 @@ +plugins { + java +} + +dependencies { + testAnnotationProcessor(platform(mn.micronaut.core.bom)) + testAnnotationProcessor(mn.micronaut.inject.java) + testImplementation(libs.testcontainers.kafka) + testImplementation(mnTest.micronaut.test.junit5) + testRuntimeOnly(libs.junit.jupiter.engine) + testImplementation(libs.awaitility) + testImplementation(projects.micronautKafka) +} + +tasks.withType { + useJUnitPlatform() +} diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/AbstractKafkaTest.java b/test-suite/src/test/java/io/micronaut/kafka/docs/AbstractKafkaTest.java new file mode 100644 index 000000000..a4097602c --- /dev/null +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/AbstractKafkaTest.java @@ -0,0 +1,28 @@ +package io.micronaut.kafka.docs; + +import io.micronaut.test.support.TestPropertyProvider; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.utility.DockerImageName; + +import java.util.*; + +/** + * @see Singleton containers + */ +abstract class AbstractKafkaTest implements TestPropertyProvider { + + static final KafkaContainer MY_KAFKA; + + static { + MY_KAFKA = new KafkaContainer( + DockerImageName.parse("confluentinc/cp-kafka:latest")); + MY_KAFKA.start(); + } + + @Override + public Map getProperties() { + return Collections.singletonMap( + "kafka.bootstrap.servers", MY_KAFKA.getBootstrapServers() + ); + } +} diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/MyTest.java b/test-suite/src/test/java/io/micronaut/kafka/docs/MyTest.java new file mode 100644 index 000000000..d11e28786 --- /dev/null +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/MyTest.java @@ -0,0 +1,40 @@ +package io.micronaut.kafka.docs; + +import io.micronaut.configuration.kafka.annotation.*; +import io.micronaut.context.annotation.*; +import io.micronaut.test.extensions.junit5.annotation.MicronautTest; +import org.junit.jupiter.api.*; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.awaitility.Awaitility.await; + +@Property(name = "spec.name", value = "MyTest") +@MicronautTest +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@Disabled("It hangs forever in the CI") +class MyTest extends AbstractKafkaTest { + @Test + void testKafkaRunning(MyProducer producer, MyConsumer consumer) { + final String message = "hello"; + producer.produce(message); + await().atMost(5, SECONDS).until(() -> message.equals(consumer.consumed)); + MY_KAFKA.stop(); + } + + @Requires(property = "spec.name", value = "MyTest") + @KafkaClient + interface MyProducer { + @Topic("my-topic") + void produce(String message); + } + + @Requires(property = "spec.name", value = "MyTest") + @KafkaListener(offsetReset = OffsetReset.EARLIEST) + static class MyConsumer { + String consumed; + @Topic("my-topic") + public void consume(String message) { + consumed = message; + } + } +}