From 0e52c61053ecf97717c27747e3f365b9b4982706 Mon Sep 17 00:00:00 2001 From: rrueda Date: Tue, 24 Oct 2023 19:04:54 -0300 Subject: [PATCH] Support for ConsumerRecords container type on batch processing --- .../BatchConsumerRecordsBinderRegistry.java | 6 ++- .../processor/KafkaConsumerProcessor.java | 13 ++++-- .../annotation/KafkaBatchListenerSpec.groovy | 45 +++++++++++++++---- 3 files changed, 52 insertions(+), 12 deletions(-) diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/bind/batch/BatchConsumerRecordsBinderRegistry.java b/kafka/src/main/java/io/micronaut/configuration/kafka/bind/batch/BatchConsumerRecordsBinderRegistry.java index 08b6819be..ccad3b7cb 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/bind/batch/BatchConsumerRecordsBinderRegistry.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/bind/batch/BatchConsumerRecordsBinderRegistry.java @@ -59,6 +59,11 @@ public BatchConsumerRecordsBinderRegistry(ConsumerRecordBinderRegistry consumerR @Override public Optional>> findArgumentBinder(Argument argument) { Class argType = argument.getType(); + + if (ConsumerRecords.class.equals(argType)) { + return Optional.of((context, consumerRecords) -> () -> (Optional) Optional.of(consumerRecords)); + } + if (Iterable.class.isAssignableFrom(argType) || argType.isArray() || Publishers.isConvertibleToPublisher(argType)) { Argument batchType = argument.getFirstTypeVariable().orElse(Argument.OBJECT_ARGUMENT); List bound = new ArrayList(); @@ -74,7 +79,6 @@ public BatchConsumerRecordsBinderRegistry(ConsumerRecordBinderRegistry consumerR if (result.isPresentAndSatisfied()) { bound.add(result.get()); } - }); } return () -> { diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java index f59e55b48..5f5377092 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java @@ -517,7 +517,7 @@ private static Optional getConsumerRebalanceListener( private static Argument findBodyArgument(ExecutableMethod method) { return Arrays.stream(method.getArguments()) - .filter(arg -> arg.getType() == ConsumerRecord.class || arg.getAnnotationMetadata().hasAnnotation(MessageBody.class)) + .filter(arg -> isConsumerRecord(arg) || arg.getAnnotationMetadata().hasAnnotation(MessageBody.class)) .findFirst() .orElseGet(() -> Arrays.stream(method.getArguments()) .filter(arg -> !arg.getAnnotationMetadata().hasStereotype(Bindable.class) @@ -528,7 +528,12 @@ private static Argument findBodyArgument(ExecutableMethod method) { private static Argument findBodyArgument(boolean batch, ExecutableMethod method) { final Argument tempBodyArg = findBodyArgument(method); - return batch && tempBodyArg != null ? getComponentType(tempBodyArg) : tempBodyArg; + + if (batch && tempBodyArg != null) { + return isConsumerRecord(tempBodyArg) ? tempBodyArg : getComponentType(tempBodyArg); + } + + return tempBodyArg; } private static boolean isLastArgumentOfSuspendedMethod(Argument argument, ExecutableMethod method) { @@ -578,11 +583,13 @@ private void configureValueDeserializer(Argument bodyArgument, DefaultKafkaCo } private static boolean isConsumerRecord(@NonNull Argument body) { - return ConsumerRecord.class.isAssignableFrom(body.getType()); + return ConsumerRecord.class.isAssignableFrom(body.getType()) || + ConsumerRecords.class.isAssignableFrom(body.getType()); } private static Argument getComponentType(final Argument argument) { final Class argumentType = argument.getType(); + return argumentType.isArray() ? Argument.of(argumentType.getComponentType()) : argument.getFirstTypeVariable().orElse(argument); diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaBatchListenerSpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaBatchListenerSpec.groovy index cf7e83569..2cfbfd59b 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaBatchListenerSpec.groovy +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaBatchListenerSpec.groovy @@ -9,6 +9,7 @@ import io.micronaut.messaging.annotation.MessageHeader import io.micronaut.messaging.annotation.SendTo import io.micronaut.serde.annotation.Serdeable import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.consumer.ConsumerRecords import reactor.core.publisher.Flux import spock.lang.Retry @@ -20,7 +21,8 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec { public static final String BOOKS_TOPIC = 'KafkaBatchListenerSpec-books' public static final String BOOKS_LIST_TOPIC = 'KafkaBatchListenerSpec-books-list' - public static final String BOOK_CONSUMER_RECORD_LIST_TOPIC = 'KafkaBatchListenerSpec-consumer-records' + public static final String BOOK_CONSUMER_RECORD_LIST_TOPIC = 'KafkaBatchListenerSpec-consumer-record-list' + public static final String BOOK_CONSUMER_RECORDS_TOPIC = 'KafkaBatchListenerSpec-consumer-records' public static final String BOOKS_HEADERS_TOPIC = 'KafkaBatchListenerSpec-books-headers' public static final String BOOKS_FLUX_TOPIC = 'KafkaBatchListenerSpec-books-flux' public static final String BOOKS_FORWARD_LIST_TOPIC = 'KafkaBatchListenerSpec-books-forward-list' @@ -169,7 +171,7 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec { } } - void "test keys and values deserialized to the correct type when receiving a batch of ConsumerRecord"() { + void "test keys and values deserialized to the correct type when receiving a batch with a list of ConsumerRecord"() { given: MyBatchClient myBatchClient = context.getBean(MyBatchClient) BookListener bookListener = context.getBean(BookListener) @@ -177,8 +179,26 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec { bookListener.keys?.clear() when: - myBatchClient.sendToReceiveAsConsumerRecord("book-1", new Book(title: "The Flowable")) - myBatchClient.sendToReceiveAsConsumerRecord("book-2", new Book(title: "The Shining")) + myBatchClient.sendToReceiveAsListOfConsumerRecord("book-1", new Book(title: "The Flowable")) + myBatchClient.sendToReceiveAsListOfConsumerRecord("book-2", new Book(title: "The Shining")) + + then: + conditions.eventually { + bookListener.books == [new Book(title: "The Flowable"), new Book(title: "The Shining")] + bookListener.keys == ["book-1", "book-2"] + } + } + + void "test keys and values deserialized to the correct type when receiving a batch of ConsumerRecords"() { + given: + MyBatchClient myBatchClient = context.getBean(MyBatchClient) + BookListener bookListener = context.getBean(BookListener) + bookListener.books?.clear() + bookListener.keys?.clear() + + when: + myBatchClient.sendToReceiveAsConsumerRecords("book-1", new Book(title: "The Flowable")) + myBatchClient.sendToReceiveAsConsumerRecords("book-2", new Book(title: "The Shining")) then: conditions.eventually { @@ -215,7 +235,10 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec { void sendBooksFlux(Flux books) @Topic(KafkaBatchListenerSpec.BOOK_CONSUMER_RECORD_LIST_TOPIC) - void sendToReceiveAsConsumerRecord(@KafkaKey String key, @MessageBody Book book) + void sendToReceiveAsListOfConsumerRecord(@KafkaKey String key, @MessageBody Book book) + + @Topic(KafkaBatchListenerSpec.BOOK_CONSUMER_RECORDS_TOPIC) + void sendToReceiveAsConsumerRecords(@KafkaKey String key, @MessageBody Book book) } @@ -270,9 +293,15 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec { } @Topic(KafkaBatchListenerSpec.BOOK_CONSUMER_RECORD_LIST_TOPIC) - void receiveConsumerRecords(List> books) { - this.keys.addAll(books.collect { it.key() }) - this.books.addAll(books.collect { it.value() }) + void receiveListOfConsumerRecord(List> consumerRecords) { + this.keys.addAll(consumerRecords.collect { it.key() }) + this.books.addAll(consumerRecords.collect { it.value() }) + } + + @Topic(KafkaBatchListenerSpec.BOOK_CONSUMER_RECORDS_TOPIC) + void receiveConsumerRecords(ConsumerRecords consumerRecords) { + this.keys.addAll(consumerRecords.collect { it.key() }) + this.books.addAll(consumerRecords.collect { it.value() }) } }