Skip to content

Commit

Permalink
doc: Updated docker and kafka feature (#777)
Browse files Browse the repository at this point in the history
  • Loading branch information
guillermocalvo authored Jul 20, 2023
1 parent c0759e7 commit 0869f9f
Show file tree
Hide file tree
Showing 15 changed files with 286 additions and 38 deletions.
3 changes: 3 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ managed-kafka-streams = { module = 'org.apache.kafka:kafka-streams', version.ref
opentracing-kafka-client = { module = 'io.opentracing.contrib:opentracing-kafka-client', version.ref = 'opentracing-kafka-client' }
opentracing-mock = { module = 'io.opentracing:opentracing-mock', version.ref = 'opentracing-mock' }

testcontainers-kafka = { module = "org.testcontainers:kafka", version.ref = 'testcontainers' }

zipkin-brave-kafka-clients = { module = 'io.zipkin.brave:brave-instrumentation-kafka-clients', version.ref = 'zipkin-brave-kafka-clients' }
awaitility = { module = 'org.awaitility:awaitility', version.ref = 'awaitility' }

Expand All @@ -51,6 +53,7 @@ micronaut-reactor = { module = "io.micronaut.reactor:micronaut-reactor-bom", ver
micronaut-rxjava2 = { module = "io.micronaut.rxjava2:micronaut-rxjava2-bom", version.ref = "micronaut-rxjava2" }
micronaut-serde = { module = "io.micronaut.serde:micronaut-serde-bom", version.ref = "micronaut-serde" }
micronaut-tracing = { module = "io.micronaut.tracing:micronaut-tracing-bom", version.ref = "micronaut-tracing" }
junit-jupiter-engine = { module = 'org.junit.jupiter:junit-jupiter-engine' }

micronaut-gradle-plugin = { module = "io.micronaut.gradle:micronaut-gradle-plugin", version.ref = "micronaut-gradle-plugin" }

4 changes: 3 additions & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ include 'kafka-bom'
include 'kafka'
include 'kafka-streams'
include 'tests:tasks-sasl-plaintext'

include 'test-suite'
include 'test-suite-groovy'
include 'test-suite-kotlin'
2 changes: 1 addition & 1 deletion src/main/docs/guide/kafkaClient/kafkaClientScope.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ The previous example can be tested in JUnit with the following test:
include::{testskafka}/producer/inject/BookSenderTest.java[tags=test, indent=0]
----

<1> An embedded version of Kafka is used
<1> A Kafka docker container is used
<2> The `BookSender` is retrieved from the api:context.ApplicationContext[] and a `ProducerRecord` sent

By using the link:{kafkaapi}/org/apache/kafka/clients/producer/KafkaProducer.html[KafkaProducer] API directly you open up even more options if you require transactions (exactly-once delivery) or want control over when records are flushed etc.
13 changes: 13 additions & 0 deletions src/main/docs/guide/kafkaClient/kafkaDockerized.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
https://micronaut-projects.github.io/micronaut-test-resources/latest/guide/#modules-kafka[Micronaut Test Resources] simplifies running Kafka for local development and testing.

> Micronaut Test Resources Kafka support will automatically start a Kafka container and provide the value of the `kafka.bootstrap.servers` property.

https://micronaut.io/launch[Micronaut Launch] and CLI already apply Test Resources to your build when you https://micronaut.io/launch?features=kafka[select the `kafka` feature].

Micronaut Test Resources uses https://testcontainers.com[Test Containers] under the hood. If you prefer to use Test Containers directly, you can create a https://www.testcontainers.org/test_framework_integration/manual_lifecycle_control/#singleton-containers[Singleton Container] and combine it with https://micronaut-projects.github.io/micronaut-test/latest/api/io/micronaut/test/support/TestPropertyProvider.html[Micronaut Test `TestPropertyProvider`]:

snippet::io.micronaut.kafka.docs.AbstractKafkaTest[]

And then test:

snippet::io.micronaut.kafka.docs.MyTest[]
35 changes: 0 additions & 35 deletions src/main/docs/guide/kafkaClient/kafkaEmbedded.adoc

This file was deleted.

2 changes: 1 addition & 1 deletion src/main/docs/guide/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ kafkaClient:
kafkaClientBatch: Sending Records in Batch
kafkaClientScope: Injecting Kafka Producer Beans
kafkaClientTx: Transactions
kafkaEmbedded: Embedding Kafka
kafkaDockerized: Running Kafka while testing and developing
kafkaListener:
title: Kafka Consumers Using @KafkaListener
kafkaListenerMethods: Defining @KafkaListener Methods
Expand Down
17 changes: 17 additions & 0 deletions test-suite-groovy/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
plugins {
groovy
}

dependencies {
testImplementation(platform(mn.micronaut.core.bom))
testCompileOnly(mn.micronaut.inject.groovy)
testImplementation(libs.testcontainers.kafka)
testImplementation(mnTest.micronaut.test.spock)
testRuntimeOnly(libs.junit.jupiter.engine)
testImplementation(libs.awaitility)
testImplementation(projects.micronautKafka)
}

tasks.withType<Test> {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.micronaut.kafka.docs

import io.micronaut.test.support.TestPropertyProvider
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.utility.DockerImageName
import spock.lang.Specification

/**
* @see <a href="https://www.testcontainers.org/test_framework_integration/manual_lifecycle_control/#singleton-containers">Singleton containers</a>
*/
abstract class AbstractKafkaTest extends Specification implements TestPropertyProvider {

static final KafkaContainer MY_KAFKA

static {
MY_KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"))
MY_KAFKA.start()
}

@Override
Map<String, String> getProperties() {
["kafka.bootstrap.servers": MY_KAFKA.getBootstrapServers()]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package io.micronaut.kafka.docs

import io.micronaut.configuration.kafka.annotation.*
import io.micronaut.context.annotation.*
import io.micronaut.test.extensions.spock.annotation.MicronautTest
import jakarta.inject.Inject
import spock.lang.Ignore

import static java.util.concurrent.TimeUnit.SECONDS
import static org.awaitility.Awaitility.await

@Property(name = "spec.name", value = "MyTest")
@MicronautTest
@Ignore("It hangs forever in the CI")
class MyTest extends AbstractKafkaTest {

@Inject
MyProducer producer
@Inject
MyConsumer consumer

void "test kafka running"() {
given:
String message = "hello"

when:
producer.produce(message)

then:
await().atMost(5, SECONDS).until(() -> consumer.consumed == message)

cleanup:
MY_KAFKA.stop()
}

@Requires(property = "spec.name", value = "MyTest")
@KafkaClient
static interface MyProducer {
@Topic("my-topic")
void produce(String message)
}

@Requires(property = "spec.name", value = "MyTest")
@KafkaListener(offsetReset = OffsetReset.EARLIEST)
static class MyConsumer {
String consumed

@Topic("my-topic")
void consume(String message) {
consumed = message
}
}

}
18 changes: 18 additions & 0 deletions test-suite-kotlin/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
plugins {
id("org.jetbrains.kotlin.jvm") version "1.8.22"
id("org.jetbrains.kotlin.kapt") version "1.8.22"
}

dependencies {
kaptTest(platform(mn.micronaut.core.bom))
kaptTest(mn.micronaut.inject.java)
testImplementation(libs.testcontainers.kafka)
testImplementation(mnTest.micronaut.test.junit5)
testRuntimeOnly(libs.junit.jupiter.engine)
testImplementation(libs.awaitility)
testImplementation(projects.micronautKafka)
}

tasks.withType<Test> {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.micronaut.kafka.docs

import io.micronaut.test.support.TestPropertyProvider
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.utility.DockerImageName
import java.util.*

/**
* @see <a href="https://www.testcontainers.org/test_framework_integration/manual_lifecycle_control/#singleton-containers">Singleton containers</a>
*/
abstract class AbstractKafkaTest : TestPropertyProvider {

companion object {
var MY_KAFKA: KafkaContainer = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"))

init {
MY_KAFKA.start()
}
}

override fun getProperties(): MutableMap<String, String> {
return Collections.singletonMap(
"kafka.bootstrap.servers", MY_KAFKA.getBootstrapServers()
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.micronaut.kafka.docs

import io.micronaut.configuration.kafka.annotation.*
import io.micronaut.context.annotation.*
import io.micronaut.test.extensions.junit5.annotation.MicronautTest
import org.awaitility.Awaitility.await
import org.junit.jupiter.api.*
import java.util.concurrent.*

@Property(name = "spec.name", value = "MyTest")
@MicronautTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@Disabled("It hangs forever in the CI")
internal class MyTest : AbstractKafkaTest() {
@Test
fun testKafkaRunning(producer: MyProducer, consumer: MyConsumer) {
val message = "hello"
producer.produce(message)
await().atMost(5, TimeUnit.SECONDS)
.until { consumer.consumed == message }
MY_KAFKA.stop()
}

@Requires(property = "spec.name", value = "MyTest")
@KafkaClient
interface MyProducer {
@Topic("my-topic")
fun produce(message: String)
}

@Requires(property = "spec.name", value = "MyTest")
@KafkaListener(offsetReset = OffsetReset.EARLIEST)
class MyConsumer {
var consumed: String? = null

@Topic("my-topic")
fun consume(message: String) {
consumed = message
}
}
}
17 changes: 17 additions & 0 deletions test-suite/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
plugins {
java
}

dependencies {
testAnnotationProcessor(platform(mn.micronaut.core.bom))
testAnnotationProcessor(mn.micronaut.inject.java)
testImplementation(libs.testcontainers.kafka)
testImplementation(mnTest.micronaut.test.junit5)
testRuntimeOnly(libs.junit.jupiter.engine)
testImplementation(libs.awaitility)
testImplementation(projects.micronautKafka)
}

tasks.withType<Test> {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.micronaut.kafka.docs;

import io.micronaut.test.support.TestPropertyProvider;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

import java.util.*;

/**
* @see <a href="https://www.testcontainers.org/test_framework_integration/manual_lifecycle_control/#singleton-containers">Singleton containers</a>
*/
abstract class AbstractKafkaTest implements TestPropertyProvider {

static final KafkaContainer MY_KAFKA;

static {
MY_KAFKA = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:latest"));
MY_KAFKA.start();
}

@Override
public Map<String, String> getProperties() {
return Collections.singletonMap(
"kafka.bootstrap.servers", MY_KAFKA.getBootstrapServers()
);
}
}
40 changes: 40 additions & 0 deletions test-suite/src/test/java/io/micronaut/kafka/docs/MyTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.micronaut.kafka.docs;

import io.micronaut.configuration.kafka.annotation.*;
import io.micronaut.context.annotation.*;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.*;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;

@Property(name = "spec.name", value = "MyTest")
@MicronautTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@Disabled("It hangs forever in the CI")
class MyTest extends AbstractKafkaTest {
@Test
void testKafkaRunning(MyProducer producer, MyConsumer consumer) {
final String message = "hello";
producer.produce(message);
await().atMost(5, SECONDS).until(() -> message.equals(consumer.consumed));
MY_KAFKA.stop();
}

@Requires(property = "spec.name", value = "MyTest")
@KafkaClient
interface MyProducer {
@Topic("my-topic")
void produce(String message);
}

@Requires(property = "spec.name", value = "MyTest")
@KafkaListener(offsetReset = OffsetReset.EARLIEST)
static class MyConsumer {
String consumed;
@Topic("my-topic")
public void consume(String message) {
consumed = message;
}
}
}

0 comments on commit 0869f9f

Please sign in to comment.