diff --git a/src/main/docs/guide/kafkaClient/kafkaClientConfiguration.adoc b/src/main/docs/guide/kafkaClient/kafkaClientConfiguration.adoc index 2c913d1e0..427aa8cfa 100644 --- a/src/main/docs/guide/kafkaClient/kafkaClientConfiguration.adoc +++ b/src/main/docs/guide/kafkaClient/kafkaClientConfiguration.adoc @@ -21,10 +21,8 @@ Any property in the link:{kafkaapi}/org/apache/kafka/clients/producer/ProducerCo To configure different properties for each client, you should set a `@KafkaClient` id using the annotation: .Using a Client ID -[source,java] ----- -@KafkaClient("product-client") ----- + +snippet::io.micronaut.kafka.docs.producer.config.ClientIdClient[tags="annotation"] This serves 2 purposes. Firstly it sets the value of the `client.id` setting used to build the `Producer`. Secondly, it allows you to apply per producer configuration in `application.yml`: diff --git a/src/main/docs/guide/kafkaClient/kafkaClientMethods.adoc b/src/main/docs/guide/kafkaClient/kafkaClientMethods.adoc index 50135868f..ba6513f7f 100644 --- a/src/main/docs/guide/kafkaClient/kafkaClientMethods.adoc +++ b/src/main/docs/guide/kafkaClient/kafkaClientMethods.adoc @@ -4,11 +4,8 @@ The Kafka key can be specified by providing a parameter annotated with `@KafkaKe The value to send is resolved by selecting the argument annotated with https://docs.micronaut.io/latest/api/io/micronaut/messaging/annotation/MessageBody.html[@MessageBody], otherwise the first argument with no specific binding annotation is used. For example: -[source,java] ----- -@Topic("my-products") -void sendProduct(@KafkaKey String brand, String name); ----- + +snippet::io.micronaut.kafka.docs.producer.methods.ProductClient[tags=key,indents=0] The method above will use the parameter `brand` as the key and the parameter `name` as the value. @@ -16,14 +13,7 @@ The method above will use the parameter `brand` as the key and the parameter `na There are a number of ways you can include message headers. One way is to annotate an argument with the https://docs.micronaut.io/latest/api/io/micronaut/messaging/annotation/MessageHeader.html[@MessageHeader] annotation and include a value when calling the method: -[source,java] ----- -@Topic("my-products") -void sendProduct( - @KafkaKey String brand, - String name, - @MessageHeader("My-Header") String myHeader); ----- +snippet::io.micronaut.kafka.docs.producer.methods.ProductClient[tags=messageheader,indents=0] The example above will include the value of the `myHeader` argument as a header called `My-Header`. @@ -40,26 +30,14 @@ If the `my.application.token` is not set then an error will occur creating the c It is also possible to pass `Collection
` or `Headers` object as method arguments as seen below. .Collection
Argument -[source,java] ----- -@Topic("my-bicycles") -void sendBicycle( - @KafkaKey String brand, - String model, - Collection
headers); ----- + +snippet::io.micronaut.kafka.docs.producer.methods.ProductClient[tags=collectionheaders,indents=0] https://javadoc.io/doc/org.apache.kafka/kafka-clients/latest/org/apache/kafka/common/header/Header.html[Kafka Header Javadocs] .Headers Argument -[source,java] ----- -@Topic("my-bicycles") -void sendBicycle( - @KafkaKey String brand, - String model, - Headers headers); ----- + +snippet::io.micronaut.kafka.docs.producer.methods.ProductClient[tags=kafkaheaders,indents=0] https://javadoc.io/doc/org.apache.kafka/kafka-clients/latest/org/apache/kafka/common/header/Headers.html[Kafka Headers Javadocs] @@ -74,37 +52,19 @@ The following sections cover possible method signatures and behaviour: ==== Single Value and Return Type -[source,java] ----- -Single sendBook( - @KafkaKey String author, - Single book -); ----- +snippet::io.micronaut.kafka.docs.producer.methods.BookClient[tags=single,indents=0] The implementation will return a rx:Single[] that when subscribed to will subscribe to the passed rx:Single[] and send the emitted item as a `ProducerRecord` emitting the item again if successful or an error otherwise. ==== Flowable Value and Return Type -[source,java] ----- -Flowable sendBooks( - @KafkaKey String author, - Flowable book -); ----- +snippet::io.micronaut.kafka.docs.producer.methods.BookClient[tags=flowable,indents=0] The implementation will return a rx:Flowable[] that when subscribed to will subscribe to the passed rx:Flowable[] and for each emitted item will send a `ProducerRecord` emitting the item again if successful or an error otherwise. -==== Flowable Value and Return Type +==== Flux Value and Return Type -[source,java] ----- -Flux sendBooks( - @KafkaKey String author, - Flux book -); ----- +snippet::io.micronaut.kafka.docs.producer.methods.BookClient[tags=flux,indents=0] The implementation will return a Reactor `Flux` that when subscribed to will subscribe to the passed `Flux` and for each emitted item will send a `ProducerRecord` emitting the resulting Kafka `RecordMetadata` if successful or an error otherwise. diff --git a/test-suite-groovy/build.gradle.kts b/test-suite-groovy/build.gradle.kts index 95eb88570..38c80106e 100644 --- a/test-suite-groovy/build.gradle.kts +++ b/test-suite-groovy/build.gradle.kts @@ -6,6 +6,7 @@ plugins { dependencies { testImplementation(platform(mn.micronaut.core.bom)) testCompileOnly(mn.micronaut.inject.groovy) + testImplementation(mn.micronaut.messaging) testImplementation(mnSerde.micronaut.serde.jackson) testImplementation(libs.testcontainers.kafka) testImplementation(mnTest.micronaut.test.spock) diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/producer/config/ClientIdClient.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/producer/config/ClientIdClient.groovy new file mode 100644 index 000000000..191f4ef5b --- /dev/null +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/producer/config/ClientIdClient.groovy @@ -0,0 +1,12 @@ +package io.micronaut.kafka.docs.producer.config + +import io.micronaut.configuration.kafka.annotation.KafkaClient +import io.micronaut.context.annotation.Requires + +@Requires(property = 'spec.name', value = 'ClientIdClientTest') +// tag::annotation[] +@KafkaClient('product-client') +// end::annotation[] +interface ClientIdClient { + // define client API +} diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/producer/methods/Book.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/producer/methods/Book.groovy new file mode 100644 index 000000000..dec7a9c45 --- /dev/null +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/producer/methods/Book.groovy @@ -0,0 +1,8 @@ +package io.micronaut.kafka.docs.producer.methods + +import groovy.transform.Canonical + +@Canonical +class Book { + String title +} diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/producer/methods/BookClient.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/producer/methods/BookClient.groovy new file mode 100644 index 000000000..7c24df011 --- /dev/null +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/producer/methods/BookClient.groovy @@ -0,0 +1,30 @@ +package io.micronaut.kafka.docs.producer.methods + +import io.micronaut.configuration.kafka.annotation.KafkaClient +import io.micronaut.configuration.kafka.annotation.KafkaKey +import io.micronaut.configuration.kafka.annotation.Topic +import io.micronaut.context.annotation.Requires +import io.reactivex.Flowable +import io.reactivex.Single +import org.apache.kafka.clients.producer.RecordMetadata +import reactor.core.publisher.Flux + +@Requires(property = 'spec.name', value = 'BookClientTest') +@KafkaClient('product-client') +interface BookClient { + + // tag::single[] + @Topic('my-books') + Single sendBook(@KafkaKey String author, Single book); + // end::single[] + + // tag::flowable[] + @Topic('my-books') + Flowable sendBooks(@KafkaKey String author, Flowable book); + // end::flowable[] + + // tag::flux[] + @Topic('my-books') + Flux sendBooks(@KafkaKey String author, Flux book); + // end::flux[] +} diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/producer/methods/ProductClient.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/producer/methods/ProductClient.groovy new file mode 100644 index 000000000..070798d18 --- /dev/null +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/producer/methods/ProductClient.groovy @@ -0,0 +1,34 @@ +package io.micronaut.kafka.docs.producer.methods + +import io.micronaut.configuration.kafka.annotation.KafkaClient +import io.micronaut.configuration.kafka.annotation.KafkaKey +import io.micronaut.configuration.kafka.annotation.Topic +import io.micronaut.context.annotation.Requires +import io.micronaut.messaging.annotation.MessageHeader +import org.apache.kafka.common.header.Header +import org.apache.kafka.common.header.Headers + +@Requires(property = 'spec.name', value = 'ProductClientTest') +@KafkaClient('product-client') +interface ProductClient { + + // tag::key[] + @Topic('my-products') + void sendProduct(@KafkaKey String brand, String name) + // end::key[] + + // tag::messageheader[] + @Topic('my-products') + void sendProduct(@KafkaKey String brand, String name, @MessageHeader('My-Header') String myHeader) + // end::messageheader[] + + // tag::collectionheaders[] + @Topic('my-bicycles') + void sendBicycle(@KafkaKey String brand, String model, Collection
headers) + // end::collectionheaders[] + + // tag::kafkaheaders[] + @Topic('my-bicycles') + void sendBicycle(@KafkaKey String brand, String model, Headers headers) + // end::kafkaheaders[] +} diff --git a/test-suite-kotlin/build.gradle.kts b/test-suite-kotlin/build.gradle.kts index cf993cd09..fd3b466e2 100644 --- a/test-suite-kotlin/build.gradle.kts +++ b/test-suite-kotlin/build.gradle.kts @@ -7,6 +7,7 @@ plugins { dependencies { kaptTest(platform(mn.micronaut.core.bom)) kaptTest(mn.micronaut.inject.java) + testImplementation(mn.micronaut.messaging) testImplementation(mnSerde.micronaut.serde.jackson) testImplementation(libs.testcontainers.kafka) testImplementation(mnTest.micronaut.test.junit5) diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/producer/config/ClientIdClient.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/producer/config/ClientIdClient.kt new file mode 100644 index 000000000..26e9f085c --- /dev/null +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/producer/config/ClientIdClient.kt @@ -0,0 +1,12 @@ +package io.micronaut.kafka.docs.producer.config + +import io.micronaut.configuration.kafka.annotation.KafkaClient +import io.micronaut.context.annotation.Requires + +@Requires(property = "spec.name", value = "ClientIdClientTest") +// tag::annotation[] +@KafkaClient("product-client") +// end::annotation[] +interface ClientIdClient { + // define client API +} diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/producer/methods/Book.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/producer/methods/Book.kt new file mode 100644 index 000000000..6f71a0521 --- /dev/null +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/producer/methods/Book.kt @@ -0,0 +1,3 @@ +package io.micronaut.kafka.docs.producer.methods + +data class Book(val title: String) diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/producer/methods/BookClient.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/producer/methods/BookClient.kt new file mode 100644 index 000000000..2925f140d --- /dev/null +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/producer/methods/BookClient.kt @@ -0,0 +1,30 @@ +package io.micronaut.kafka.docs.producer.methods + +import io.micronaut.configuration.kafka.annotation.KafkaClient +import io.micronaut.configuration.kafka.annotation.KafkaKey +import io.micronaut.configuration.kafka.annotation.Topic +import io.micronaut.context.annotation.Requires +import io.reactivex.Flowable +import io.reactivex.Single +import org.apache.kafka.clients.producer.RecordMetadata +import reactor.core.publisher.Flux + +@Requires(property = "spec.name", value = "BookClientTest") +@KafkaClient("product-client") +interface BookClient { + + // tag::single[] + @Topic("my-books") + fun sendBook(@KafkaKey author: String, book: Single): Single + // end::single[] + + // tag::flowable[] + @Topic("my-books") + fun sendBooks(@KafkaKey author: String, book: Flowable): Flowable + // end::flowable[] + + // tag::flux[] + @Topic("my-books") + fun sendBooks(@KafkaKey author: String, book: Flux): Flux + // end::flux[] +} diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/producer/methods/ProductClient.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/producer/methods/ProductClient.kt new file mode 100644 index 000000000..8ddae3eb6 --- /dev/null +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/producer/methods/ProductClient.kt @@ -0,0 +1,34 @@ +package io.micronaut.kafka.docs.producer.methods + +import io.micronaut.configuration.kafka.annotation.KafkaClient +import io.micronaut.configuration.kafka.annotation.KafkaKey +import io.micronaut.configuration.kafka.annotation.Topic +import io.micronaut.context.annotation.Requires +import io.micronaut.messaging.annotation.MessageHeader +import org.apache.kafka.common.header.Header +import org.apache.kafka.common.header.Headers + +@Requires(property = "spec.name", value = "ProductClientTest") +@KafkaClient("product-client") +interface ProductClient { + + // tag::key[] + @Topic("my-products") + fun sendProduct(@KafkaKey brand: String, name: String) + // end::key[] + + // tag::messageheader[] + @Topic("my-products") + fun sendProduct(@KafkaKey brand: String, name: String, @MessageHeader("My-Header") myHeader: String) + // end::messageheader[] + + // tag::collectionheaders[] + @Topic("my-bicycles") + fun sendBicycle(@KafkaKey brand: String, model: String, headers: Collection
) + // end::collectionheaders[] + + // tag::kafkaheaders[] + @Topic("my-bicycles") + fun sendBicycle(@KafkaKey brand: String, model: String, headers: Headers) + // end::kafkaheaders[] +} diff --git a/test-suite/build.gradle.kts b/test-suite/build.gradle.kts index d466d8bcb..013915a25 100644 --- a/test-suite/build.gradle.kts +++ b/test-suite/build.gradle.kts @@ -6,6 +6,7 @@ plugins { dependencies { testAnnotationProcessor(platform(mn.micronaut.core.bom)) testAnnotationProcessor(mn.micronaut.inject.java) + testImplementation(mn.micronaut.messaging) testImplementation(mnSerde.micronaut.serde.jackson) testImplementation(libs.testcontainers.kafka) testImplementation(mnTest.micronaut.test.junit5) diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/producer/config/ClientIdClient.java b/test-suite/src/test/java/io/micronaut/kafka/docs/producer/config/ClientIdClient.java new file mode 100644 index 000000000..db7b6323d --- /dev/null +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/producer/config/ClientIdClient.java @@ -0,0 +1,12 @@ +package io.micronaut.kafka.docs.producer.config; + +import io.micronaut.configuration.kafka.annotation.KafkaClient; +import io.micronaut.context.annotation.Requires; + +@Requires(property = "spec.name", value = "ClientIdClientTest") +// tag::annotation[] +@KafkaClient("product-client") +// end::annotation[] +public interface ClientIdClient { + // define client API +} diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/producer/methods/Book.java b/test-suite/src/test/java/io/micronaut/kafka/docs/producer/methods/Book.java new file mode 100644 index 000000000..cffe88979 --- /dev/null +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/producer/methods/Book.java @@ -0,0 +1,7 @@ +package io.micronaut.kafka.docs.producer.methods; + +import io.micronaut.serde.annotation.Serdeable; + +@Serdeable +public record Book(String title) { +} diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/producer/methods/BookClient.java b/test-suite/src/test/java/io/micronaut/kafka/docs/producer/methods/BookClient.java new file mode 100644 index 000000000..66db99b6d --- /dev/null +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/producer/methods/BookClient.java @@ -0,0 +1,30 @@ +package io.micronaut.kafka.docs.producer.methods; + +import io.micronaut.configuration.kafka.annotation.KafkaClient; +import io.micronaut.configuration.kafka.annotation.KafkaKey; +import io.micronaut.configuration.kafka.annotation.Topic; +import io.micronaut.context.annotation.Requires; +import io.reactivex.Flowable; +import io.reactivex.Single; +import org.apache.kafka.clients.producer.RecordMetadata; +import reactor.core.publisher.Flux; + +@Requires(property = "spec.name", value = "BookClientTest") +@KafkaClient("product-client") +public interface BookClient { + + // tag::single[] + @Topic("my-books") + Single sendBook(@KafkaKey String author, Single book); + // end::single[] + + // tag::flowable[] + @Topic("my-books") + Flowable sendBooks(@KafkaKey String author, Flowable book); + // end::flowable[] + + // tag::flux[] + @Topic("my-books") + Flux sendBooks(@KafkaKey String author, Flux book); + // end::flux[] +} diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/producer/methods/ProductClient.java b/test-suite/src/test/java/io/micronaut/kafka/docs/producer/methods/ProductClient.java new file mode 100644 index 000000000..f2f940c09 --- /dev/null +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/producer/methods/ProductClient.java @@ -0,0 +1,36 @@ +package io.micronaut.kafka.docs.producer.methods; + +import io.micronaut.configuration.kafka.annotation.KafkaClient; +import io.micronaut.configuration.kafka.annotation.KafkaKey; +import io.micronaut.configuration.kafka.annotation.Topic; +import io.micronaut.context.annotation.Requires; +import io.micronaut.messaging.annotation.MessageHeader; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; + +import java.util.Collection; + +@Requires(property = "spec.name", value = "ProductClientTest") +@KafkaClient("product-client") +public interface ProductClient { + + // tag::key[] + @Topic("my-products") + void sendProduct(@KafkaKey String brand, String name); + // end::key[] + + // tag::messageheader[] + @Topic("my-products") + void sendProduct(@KafkaKey String brand, String name, @MessageHeader("My-Header") String myHeader); + // end::messageheader[] + + // tag::collectionheaders[] + @Topic("my-bicycles") + void sendBicycle(@KafkaKey String brand, String model, Collection
headers); + // end::collectionheaders[] + + // tag::kafkaheaders[] + @Topic("my-bicycles") + void sendBicycle(@KafkaKey String brand, String model, Headers headers); + // end::kafkaheaders[] +}