diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/bind/batch/BatchConsumerRecordsBinderRegistry.java b/kafka/src/main/java/io/micronaut/configuration/kafka/bind/batch/BatchConsumerRecordsBinderRegistry.java index 08b6819be..ccad3b7cb 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/bind/batch/BatchConsumerRecordsBinderRegistry.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/bind/batch/BatchConsumerRecordsBinderRegistry.java @@ -59,6 +59,11 @@ public BatchConsumerRecordsBinderRegistry(ConsumerRecordBinderRegistry consumerR @Override public Optional>> findArgumentBinder(Argument argument) { Class argType = argument.getType(); + + if (ConsumerRecords.class.equals(argType)) { + return Optional.of((context, consumerRecords) -> () -> (Optional) Optional.of(consumerRecords)); + } + if (Iterable.class.isAssignableFrom(argType) || argType.isArray() || Publishers.isConvertibleToPublisher(argType)) { Argument batchType = argument.getFirstTypeVariable().orElse(Argument.OBJECT_ARGUMENT); List bound = new ArrayList(); @@ -74,7 +79,6 @@ public BatchConsumerRecordsBinderRegistry(ConsumerRecordBinderRegistry consumerR if (result.isPresentAndSatisfied()) { bound.add(result.get()); } - }); } return () -> { diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java index f59e55b48..5f5377092 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java @@ -517,7 +517,7 @@ private static Optional getConsumerRebalanceListener( private static Argument findBodyArgument(ExecutableMethod method) { return Arrays.stream(method.getArguments()) - .filter(arg -> arg.getType() == ConsumerRecord.class || arg.getAnnotationMetadata().hasAnnotation(MessageBody.class)) + .filter(arg -> isConsumerRecord(arg) || arg.getAnnotationMetadata().hasAnnotation(MessageBody.class)) .findFirst() .orElseGet(() -> Arrays.stream(method.getArguments()) .filter(arg -> !arg.getAnnotationMetadata().hasStereotype(Bindable.class) @@ -528,7 +528,12 @@ private static Argument findBodyArgument(ExecutableMethod method) { private static Argument findBodyArgument(boolean batch, ExecutableMethod method) { final Argument tempBodyArg = findBodyArgument(method); - return batch && tempBodyArg != null ? getComponentType(tempBodyArg) : tempBodyArg; + + if (batch && tempBodyArg != null) { + return isConsumerRecord(tempBodyArg) ? tempBodyArg : getComponentType(tempBodyArg); + } + + return tempBodyArg; } private static boolean isLastArgumentOfSuspendedMethod(Argument argument, ExecutableMethod method) { @@ -578,11 +583,13 @@ private void configureValueDeserializer(Argument bodyArgument, DefaultKafkaCo } private static boolean isConsumerRecord(@NonNull Argument body) { - return ConsumerRecord.class.isAssignableFrom(body.getType()); + return ConsumerRecord.class.isAssignableFrom(body.getType()) || + ConsumerRecords.class.isAssignableFrom(body.getType()); } private static Argument getComponentType(final Argument argument) { final Class argumentType = argument.getType(); + return argumentType.isArray() ? Argument.of(argumentType.getComponentType()) : argument.getFirstTypeVariable().orElse(argument); diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaBatchListenerSpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaBatchListenerSpec.groovy index cf7e83569..2cfbfd59b 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaBatchListenerSpec.groovy +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaBatchListenerSpec.groovy @@ -9,6 +9,7 @@ import io.micronaut.messaging.annotation.MessageHeader import io.micronaut.messaging.annotation.SendTo import io.micronaut.serde.annotation.Serdeable import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.consumer.ConsumerRecords import reactor.core.publisher.Flux import spock.lang.Retry @@ -20,7 +21,8 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec { public static final String BOOKS_TOPIC = 'KafkaBatchListenerSpec-books' public static final String BOOKS_LIST_TOPIC = 'KafkaBatchListenerSpec-books-list' - public static final String BOOK_CONSUMER_RECORD_LIST_TOPIC = 'KafkaBatchListenerSpec-consumer-records' + public static final String BOOK_CONSUMER_RECORD_LIST_TOPIC = 'KafkaBatchListenerSpec-consumer-record-list' + public static final String BOOK_CONSUMER_RECORDS_TOPIC = 'KafkaBatchListenerSpec-consumer-records' public static final String BOOKS_HEADERS_TOPIC = 'KafkaBatchListenerSpec-books-headers' public static final String BOOKS_FLUX_TOPIC = 'KafkaBatchListenerSpec-books-flux' public static final String BOOKS_FORWARD_LIST_TOPIC = 'KafkaBatchListenerSpec-books-forward-list' @@ -169,7 +171,7 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec { } } - void "test keys and values deserialized to the correct type when receiving a batch of ConsumerRecord"() { + void "test keys and values deserialized to the correct type when receiving a batch with a list of ConsumerRecord"() { given: MyBatchClient myBatchClient = context.getBean(MyBatchClient) BookListener bookListener = context.getBean(BookListener) @@ -177,8 +179,26 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec { bookListener.keys?.clear() when: - myBatchClient.sendToReceiveAsConsumerRecord("book-1", new Book(title: "The Flowable")) - myBatchClient.sendToReceiveAsConsumerRecord("book-2", new Book(title: "The Shining")) + myBatchClient.sendToReceiveAsListOfConsumerRecord("book-1", new Book(title: "The Flowable")) + myBatchClient.sendToReceiveAsListOfConsumerRecord("book-2", new Book(title: "The Shining")) + + then: + conditions.eventually { + bookListener.books == [new Book(title: "The Flowable"), new Book(title: "The Shining")] + bookListener.keys == ["book-1", "book-2"] + } + } + + void "test keys and values deserialized to the correct type when receiving a batch of ConsumerRecords"() { + given: + MyBatchClient myBatchClient = context.getBean(MyBatchClient) + BookListener bookListener = context.getBean(BookListener) + bookListener.books?.clear() + bookListener.keys?.clear() + + when: + myBatchClient.sendToReceiveAsConsumerRecords("book-1", new Book(title: "The Flowable")) + myBatchClient.sendToReceiveAsConsumerRecords("book-2", new Book(title: "The Shining")) then: conditions.eventually { @@ -215,7 +235,10 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec { void sendBooksFlux(Flux books) @Topic(KafkaBatchListenerSpec.BOOK_CONSUMER_RECORD_LIST_TOPIC) - void sendToReceiveAsConsumerRecord(@KafkaKey String key, @MessageBody Book book) + void sendToReceiveAsListOfConsumerRecord(@KafkaKey String key, @MessageBody Book book) + + @Topic(KafkaBatchListenerSpec.BOOK_CONSUMER_RECORDS_TOPIC) + void sendToReceiveAsConsumerRecords(@KafkaKey String key, @MessageBody Book book) } @@ -270,9 +293,15 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec { } @Topic(KafkaBatchListenerSpec.BOOK_CONSUMER_RECORD_LIST_TOPIC) - void receiveConsumerRecords(List> books) { - this.keys.addAll(books.collect { it.key() }) - this.books.addAll(books.collect { it.value() }) + void receiveListOfConsumerRecord(List> consumerRecords) { + this.keys.addAll(consumerRecords.collect { it.key() }) + this.books.addAll(consumerRecords.collect { it.value() }) + } + + @Topic(KafkaBatchListenerSpec.BOOK_CONSUMER_RECORDS_TOPIC) + void receiveConsumerRecords(ConsumerRecords consumerRecords) { + this.keys.addAll(consumerRecords.collect { it.key() }) + this.books.addAll(consumerRecords.collect { it.value() }) } } diff --git a/src/main/docs/guide/kafkaListener/kafkaListenerBatch.adoc b/src/main/docs/guide/kafkaListener/kafkaListenerBatch.adoc index c3d83d5a0..4354ee52c 100644 --- a/src/main/docs/guide/kafkaListener/kafkaListenerBatch.adoc +++ b/src/main/docs/guide/kafkaListener/kafkaListenerBatch.adoc @@ -42,6 +42,23 @@ snippet::io.micronaut.kafka.docs.consumer.batch.manual.BookListener[tags=method, This example is fairly trivial in that it commits offsets after processing each record in a batch, but you can for example commit after processing every 10, or every 100 or whatever makes sense for your application. +== Receiving a ConsumerRecords + +When batching you can receive the entire `ConsumerRecords` object being listened for. In this case you should specify appropriate generic types for the key and value of the `ConsumerRecords` so that Micronaut can pick the correct deserializer for each. + +This is useful when the need is to process or commit the records by partition, as the `ConsumerRecords` object already groups records by partition: + +.Commit only once for each partition + +snippet::io.micronaut.kafka.docs.consumer.batch.manual.BookListener[tags=consumerRecords, indent=0] + +<1> Committing offsets automatically is disabled +<2> The method receives the batch of books as a `ConsumerRecords` holder object +<3> Each partition is iterated over +<4> Each record for the partition is processed +<5> The last read offset for the partition is stored +<6> The offset is committed once for each partition + == Reactive Batch Processing Batch listeners also support defining reactive types (Reactor `Flux` or RxJava rx:Flowable[]) as the method argument. diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/batch/manual/BookListener.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/batch/manual/BookListener.groovy index 8413a8794..befb25d7b 100644 --- a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/batch/manual/BookListener.groovy +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/consumer/batch/manual/BookListener.groovy @@ -6,6 +6,7 @@ import io.micronaut.context.annotation.Requires import io.micronaut.kafka.docs.consumer.batch.Book import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.consumer.ConsumerRecords import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.common.TopicPartition @@ -38,4 +39,27 @@ class BookListener { } } // end::method[] + + // tag::consumerRecords[] + @KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED, batch = true) // <1> + @Topic("all-the-books") + void receiveConsumerRecords(ConsumerRecords consumerRecords, Consumer kafkaConsumer) { // <2> + for (TopicPartition partition : consumerRecords.partitions()) { // <3> + long offset = Long.MIN_VALUE; + // process partition records + for (ConsumerRecord record : consumerRecords.records(partition)) { // <4> + // process the book + Book book = record.value(); + // keep last offset + offset = record.offset(); // <5> + } + + // commit partition offset + kafkaConsumer.commitSync(Collections.singletonMap( // <6> + partition, + new OffsetAndMetadata(offset + 1, "my metadata") + )); + } + } + // end::consumerRecords[] } diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/batch/manual/BookListener.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/batch/manual/BookListener.kt index 14e0d2982..35a736c54 100644 --- a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/batch/manual/BookListener.kt +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/consumer/batch/manual/BookListener.kt @@ -9,6 +9,7 @@ import io.micronaut.context.annotation.Requires import io.micronaut.kafka.docs.consumer.batch.Book import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.consumer.ConsumerRecords import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.common.TopicPartition @@ -41,5 +42,33 @@ internal class BookListener { } } // end::method[] + + // end::method[] + + // end::method[] + // tag::consumerRecords[] + @KafkaListener(offsetReset = OffsetReset.EARLIEST, offsetStrategy = OffsetStrategy.DISABLED, batch = true) // <1> + @Topic("all-the-books") + fun receiveConsumerRecords(consumerRecords: ConsumerRecords, kafkaConsumer: Consumer<*, *>) { // <2> + for (partition in consumerRecords.partitions()) { // <3> + var offset = Long.MIN_VALUE + // process partition records + for (record in consumerRecords.records(partition)) { // <4> + // process the book + val book = record.value() + // keep last offset + offset = record.offset() // <5> + } + + // commit partition offset + kafkaConsumer.commitSync( + Collections.singletonMap( // <6> + partition, + OffsetAndMetadata(offset + 1, "my metadata") + ) + ) + } + } + // end::consumerRecords[] } diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/batch/manual/BookListener.java b/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/batch/manual/BookListener.java index 73dae51e6..38277726d 100644 --- a/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/batch/manual/BookListener.java +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/consumer/batch/manual/BookListener.java @@ -6,6 +6,7 @@ import io.micronaut.kafka.docs.consumer.batch.Book; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; @@ -42,4 +43,27 @@ public void receive(List> records, Consumer kafkaCo } } // end::method[] + + // tag::consumerRecords[] + @KafkaListener(offsetReset = EARLIEST, offsetStrategy = DISABLED, batch = true) // <1> + @Topic("all-the-books") + public void receiveConsumerRecords(ConsumerRecords consumerRecords, Consumer kafkaConsumer) { // <2> + for (TopicPartition partition : consumerRecords.partitions()) { // <3> + long offset = Long.MIN_VALUE; + // process partition records + for (ConsumerRecord record : consumerRecords.records(partition)) { // <4> + // process the book + Book book = record.value(); + // keep last offset + offset = record.offset(); // <5> + } + + // commit partition offset + kafkaConsumer.commitSync(Collections.singletonMap( // <6> + partition, + new OffsetAndMetadata(offset + 1, "my metadata") + )); + } + } + // end::consumerRecords[] }