diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/bind/batch/BatchConsumerRecordsBinderRegistry.java b/kafka/src/main/java/io/micronaut/configuration/kafka/bind/batch/BatchConsumerRecordsBinderRegistry.java index 08b6819be..ccad3b7cb 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/bind/batch/BatchConsumerRecordsBinderRegistry.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/bind/batch/BatchConsumerRecordsBinderRegistry.java @@ -59,6 +59,11 @@ public BatchConsumerRecordsBinderRegistry(ConsumerRecordBinderRegistry consumerR @Override public Optional>> findArgumentBinder(Argument argument) { Class argType = argument.getType(); + + if (ConsumerRecords.class.equals(argType)) { + return Optional.of((context, consumerRecords) -> () -> (Optional) Optional.of(consumerRecords)); + } + if (Iterable.class.isAssignableFrom(argType) || argType.isArray() || Publishers.isConvertibleToPublisher(argType)) { Argument batchType = argument.getFirstTypeVariable().orElse(Argument.OBJECT_ARGUMENT); List bound = new ArrayList(); @@ -74,7 +79,6 @@ public BatchConsumerRecordsBinderRegistry(ConsumerRecordBinderRegistry consumerR if (result.isPresentAndSatisfied()) { bound.add(result.get()); } - }); } return () -> { diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java index a45c0e782..b379f2c0b 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java @@ -518,20 +518,25 @@ private static Optional getConsumerRebalanceListener( return Optional.empty(); } - private static Argument findBodyArgument(ExecutableMethod method) { + private static Optional> findBodyArgument(ExecutableMethod method) { return Arrays.stream(method.getArguments()) - .filter(arg -> arg.getType() == ConsumerRecord.class || arg.getAnnotationMetadata().hasAnnotation(MessageBody.class)) + .filter(arg -> isConsumerRecord(arg) || arg.getAnnotationMetadata().hasAnnotation(MessageBody.class)) .findFirst() - .orElseGet(() -> Arrays.stream(method.getArguments()) + .or(() -> Arrays.stream(method.getArguments()) .filter(arg -> !arg.getAnnotationMetadata().hasStereotype(Bindable.class) && !isLastArgumentOfSuspendedMethod(arg, method)) - .findFirst() - .orElse(null)); + .findFirst()); } - private static Argument findBodyArgument(boolean batch, ExecutableMethod method) { - final Argument tempBodyArg = findBodyArgument(method); - return batch && tempBodyArg != null ? getComponentType(tempBodyArg) : tempBodyArg; + private static Optional> findBodyArgument(boolean batch, ExecutableMethod method) { + return findBodyArgument(method) + .map(arg -> { + if (isConsumerRecord(arg)) { + return arg; + } else { + return batch ? getComponentType(arg) : arg; + } + }); } private static boolean isLastArgumentOfSuspendedMethod(Argument argument, ExecutableMethod method) { @@ -544,20 +549,20 @@ private static boolean isLastArgumentOfSuspendedMethod(Argument argument, Exe private void configureDeserializers(final ExecutableMethod method, final DefaultKafkaConsumerConfiguration config) { final boolean batch = method.isTrue(KafkaListener.class, "batch"); - final Argument bodyArgument = findBodyArgument(batch, method); + final Optional> bodyArgument = findBodyArgument(batch, method); configureKeyDeserializer(bodyArgument, method, config); configureValueDeserializer(bodyArgument, config); debugDeserializationConfiguration(method, config); } @SuppressWarnings({ "rawtypes", "unchecked" }) - private void configureKeyDeserializer(Argument bodyArgument, ExecutableMethod method, DefaultKafkaConsumerConfiguration config) { + private void configureKeyDeserializer(Optional> bodyArgument, ExecutableMethod method, DefaultKafkaConsumerConfiguration config) { if (!config.getConfig().containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) && config.getKeyDeserializer().isEmpty()) { // figure out the Key deserializer Arrays.stream(method.getArguments()) .filter(arg -> arg.isAnnotationPresent(KafkaKey.class)) .findFirst() - .or(() -> Optional.ofNullable(bodyArgument) + .or(() -> bodyArgument .filter(KafkaConsumerProcessor::isConsumerRecord) .flatMap(b -> b.getTypeVariable("K"))) .map(serdeRegistry::pickDeserializer) @@ -567,13 +572,12 @@ private void configureKeyDeserializer(Argument bodyArgument, ExecutableMethod } @SuppressWarnings({ "rawtypes", "unchecked" }) - private void configureValueDeserializer(Argument bodyArgument, DefaultKafkaConsumerConfiguration config) { + private void configureValueDeserializer(Optional> bodyArgument, DefaultKafkaConsumerConfiguration config) { if (!config.getConfig().containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) && config.getValueDeserializer().isEmpty()) { // figure out the Value deserializer - final Optional> body = Optional.ofNullable(bodyArgument); - body.filter(KafkaConsumerProcessor::isConsumerRecord) + bodyArgument.filter(KafkaConsumerProcessor::isConsumerRecord) .flatMap(b -> b.getTypeVariable("V")) - .or(() -> body) + .or(() -> bodyArgument) .map(serdeRegistry::pickDeserializer) .ifPresentOrElse(config::setValueDeserializer, () -> config.setValueDeserializer(DEFAULT_VALUE_DESERIALIZER)); @@ -581,11 +585,13 @@ private void configureValueDeserializer(Argument bodyArgument, DefaultKafkaCo } private static boolean isConsumerRecord(@NonNull Argument body) { - return ConsumerRecord.class.isAssignableFrom(body.getType()); + return ConsumerRecord.class.isAssignableFrom(body.getType()) || + ConsumerRecords.class.isAssignableFrom(body.getType()); } private static Argument getComponentType(final Argument argument) { final Class argumentType = argument.getType(); + return argumentType.isArray() ? Argument.of(argumentType.getComponentType()) : argument.getFirstTypeVariable().orElse(argument); diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaBatchListenerSpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaBatchListenerSpec.groovy index cf7e83569..2cfbfd59b 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaBatchListenerSpec.groovy +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/KafkaBatchListenerSpec.groovy @@ -9,6 +9,7 @@ import io.micronaut.messaging.annotation.MessageHeader import io.micronaut.messaging.annotation.SendTo import io.micronaut.serde.annotation.Serdeable import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.consumer.ConsumerRecords import reactor.core.publisher.Flux import spock.lang.Retry @@ -20,7 +21,8 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec { public static final String BOOKS_TOPIC = 'KafkaBatchListenerSpec-books' public static final String BOOKS_LIST_TOPIC = 'KafkaBatchListenerSpec-books-list' - public static final String BOOK_CONSUMER_RECORD_LIST_TOPIC = 'KafkaBatchListenerSpec-consumer-records' + public static final String BOOK_CONSUMER_RECORD_LIST_TOPIC = 'KafkaBatchListenerSpec-consumer-record-list' + public static final String BOOK_CONSUMER_RECORDS_TOPIC = 'KafkaBatchListenerSpec-consumer-records' public static final String BOOKS_HEADERS_TOPIC = 'KafkaBatchListenerSpec-books-headers' public static final String BOOKS_FLUX_TOPIC = 'KafkaBatchListenerSpec-books-flux' public static final String BOOKS_FORWARD_LIST_TOPIC = 'KafkaBatchListenerSpec-books-forward-list' @@ -169,7 +171,7 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec { } } - void "test keys and values deserialized to the correct type when receiving a batch of ConsumerRecord"() { + void "test keys and values deserialized to the correct type when receiving a batch with a list of ConsumerRecord"() { given: MyBatchClient myBatchClient = context.getBean(MyBatchClient) BookListener bookListener = context.getBean(BookListener) @@ -177,8 +179,26 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec { bookListener.keys?.clear() when: - myBatchClient.sendToReceiveAsConsumerRecord("book-1", new Book(title: "The Flowable")) - myBatchClient.sendToReceiveAsConsumerRecord("book-2", new Book(title: "The Shining")) + myBatchClient.sendToReceiveAsListOfConsumerRecord("book-1", new Book(title: "The Flowable")) + myBatchClient.sendToReceiveAsListOfConsumerRecord("book-2", new Book(title: "The Shining")) + + then: + conditions.eventually { + bookListener.books == [new Book(title: "The Flowable"), new Book(title: "The Shining")] + bookListener.keys == ["book-1", "book-2"] + } + } + + void "test keys and values deserialized to the correct type when receiving a batch of ConsumerRecords"() { + given: + MyBatchClient myBatchClient = context.getBean(MyBatchClient) + BookListener bookListener = context.getBean(BookListener) + bookListener.books?.clear() + bookListener.keys?.clear() + + when: + myBatchClient.sendToReceiveAsConsumerRecords("book-1", new Book(title: "The Flowable")) + myBatchClient.sendToReceiveAsConsumerRecords("book-2", new Book(title: "The Shining")) then: conditions.eventually { @@ -215,7 +235,10 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec { void sendBooksFlux(Flux books) @Topic(KafkaBatchListenerSpec.BOOK_CONSUMER_RECORD_LIST_TOPIC) - void sendToReceiveAsConsumerRecord(@KafkaKey String key, @MessageBody Book book) + void sendToReceiveAsListOfConsumerRecord(@KafkaKey String key, @MessageBody Book book) + + @Topic(KafkaBatchListenerSpec.BOOK_CONSUMER_RECORDS_TOPIC) + void sendToReceiveAsConsumerRecords(@KafkaKey String key, @MessageBody Book book) } @@ -270,9 +293,15 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec { } @Topic(KafkaBatchListenerSpec.BOOK_CONSUMER_RECORD_LIST_TOPIC) - void receiveConsumerRecords(List> books) { - this.keys.addAll(books.collect { it.key() }) - this.books.addAll(books.collect { it.value() }) + void receiveListOfConsumerRecord(List> consumerRecords) { + this.keys.addAll(consumerRecords.collect { it.key() }) + this.books.addAll(consumerRecords.collect { it.value() }) + } + + @Topic(KafkaBatchListenerSpec.BOOK_CONSUMER_RECORDS_TOPIC) + void receiveConsumerRecords(ConsumerRecords consumerRecords) { + this.keys.addAll(consumerRecords.collect { it.key() }) + this.books.addAll(consumerRecords.collect { it.value() }) } }