Skip to content

Commit

Permalink
Support for ConsumerRecords container type on batch processing
Browse files Browse the repository at this point in the history
  • Loading branch information
rorueda committed Nov 13, 2023
1 parent 0f0a00a commit 0e52c61
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ public BatchConsumerRecordsBinderRegistry(ConsumerRecordBinderRegistry consumerR
@Override
public <T> Optional<ArgumentBinder<T, ConsumerRecords<?, ?>>> findArgumentBinder(Argument<T> argument) {
Class<T> argType = argument.getType();

if (ConsumerRecords.class.equals(argType)) {
return Optional.of((context, consumerRecords) -> () -> (Optional<T>) Optional.of(consumerRecords));
}

if (Iterable.class.isAssignableFrom(argType) || argType.isArray() || Publishers.isConvertibleToPublisher(argType)) {
Argument<?> batchType = argument.getFirstTypeVariable().orElse(Argument.OBJECT_ARGUMENT);
List bound = new ArrayList();
Expand All @@ -74,7 +79,6 @@ public BatchConsumerRecordsBinderRegistry(ConsumerRecordBinderRegistry consumerR
if (result.isPresentAndSatisfied()) {
bound.add(result.get());
}

});
}
return () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ private static Optional<ConsumerRebalanceListener> getConsumerRebalanceListener(

private static Argument<?> findBodyArgument(ExecutableMethod<?, ?> method) {
return Arrays.stream(method.getArguments())
.filter(arg -> arg.getType() == ConsumerRecord.class || arg.getAnnotationMetadata().hasAnnotation(MessageBody.class))
.filter(arg -> isConsumerRecord(arg) || arg.getAnnotationMetadata().hasAnnotation(MessageBody.class))
.findFirst()
.orElseGet(() -> Arrays.stream(method.getArguments())
.filter(arg -> !arg.getAnnotationMetadata().hasStereotype(Bindable.class)
Expand All @@ -528,7 +528,12 @@ private static Argument<?> findBodyArgument(ExecutableMethod<?, ?> method) {

private static Argument<?> findBodyArgument(boolean batch, ExecutableMethod<?, ?> method) {
final Argument<?> tempBodyArg = findBodyArgument(method);
return batch && tempBodyArg != null ? getComponentType(tempBodyArg) : tempBodyArg;

if (batch && tempBodyArg != null) {
return isConsumerRecord(tempBodyArg) ? tempBodyArg : getComponentType(tempBodyArg);
}

return tempBodyArg;
}

private static boolean isLastArgumentOfSuspendedMethod(Argument<?> argument, ExecutableMethod<?, ?> method) {
Expand Down Expand Up @@ -578,11 +583,13 @@ private void configureValueDeserializer(Argument<?> bodyArgument, DefaultKafkaCo
}

private static boolean isConsumerRecord(@NonNull Argument<?> body) {
return ConsumerRecord.class.isAssignableFrom(body.getType());
return ConsumerRecord.class.isAssignableFrom(body.getType()) ||
ConsumerRecords.class.isAssignableFrom(body.getType());
}

private static Argument<?> getComponentType(final Argument<?> argument) {
final Class<?> argumentType = argument.getType();

return argumentType.isArray()
? Argument.of(argumentType.getComponentType())
: argument.getFirstTypeVariable().orElse(argument);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.micronaut.messaging.annotation.MessageHeader
import io.micronaut.messaging.annotation.SendTo
import io.micronaut.serde.annotation.Serdeable
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import reactor.core.publisher.Flux
import spock.lang.Retry

Expand All @@ -20,7 +21,8 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec {

public static final String BOOKS_TOPIC = 'KafkaBatchListenerSpec-books'
public static final String BOOKS_LIST_TOPIC = 'KafkaBatchListenerSpec-books-list'
public static final String BOOK_CONSUMER_RECORD_LIST_TOPIC = 'KafkaBatchListenerSpec-consumer-records'
public static final String BOOK_CONSUMER_RECORD_LIST_TOPIC = 'KafkaBatchListenerSpec-consumer-record-list'
public static final String BOOK_CONSUMER_RECORDS_TOPIC = 'KafkaBatchListenerSpec-consumer-records'
public static final String BOOKS_HEADERS_TOPIC = 'KafkaBatchListenerSpec-books-headers'
public static final String BOOKS_FLUX_TOPIC = 'KafkaBatchListenerSpec-books-flux'
public static final String BOOKS_FORWARD_LIST_TOPIC = 'KafkaBatchListenerSpec-books-forward-list'
Expand Down Expand Up @@ -169,16 +171,34 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec {
}
}

void "test keys and values deserialized to the correct type when receiving a batch of ConsumerRecord"() {
void "test keys and values deserialized to the correct type when receiving a batch with a list of ConsumerRecord"() {
given:
MyBatchClient myBatchClient = context.getBean(MyBatchClient)
BookListener bookListener = context.getBean(BookListener)
bookListener.books?.clear()
bookListener.keys?.clear()

when:
myBatchClient.sendToReceiveAsConsumerRecord("book-1", new Book(title: "The Flowable"))
myBatchClient.sendToReceiveAsConsumerRecord("book-2", new Book(title: "The Shining"))
myBatchClient.sendToReceiveAsListOfConsumerRecord("book-1", new Book(title: "The Flowable"))
myBatchClient.sendToReceiveAsListOfConsumerRecord("book-2", new Book(title: "The Shining"))

then:
conditions.eventually {
bookListener.books == [new Book(title: "The Flowable"), new Book(title: "The Shining")]
bookListener.keys == ["book-1", "book-2"]
}
}

void "test keys and values deserialized to the correct type when receiving a batch of ConsumerRecords"() {
given:
MyBatchClient myBatchClient = context.getBean(MyBatchClient)
BookListener bookListener = context.getBean(BookListener)
bookListener.books?.clear()
bookListener.keys?.clear()

when:
myBatchClient.sendToReceiveAsConsumerRecords("book-1", new Book(title: "The Flowable"))
myBatchClient.sendToReceiveAsConsumerRecords("book-2", new Book(title: "The Shining"))

then:
conditions.eventually {
Expand Down Expand Up @@ -215,7 +235,10 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec {
void sendBooksFlux(Flux<Book> books)

@Topic(KafkaBatchListenerSpec.BOOK_CONSUMER_RECORD_LIST_TOPIC)
void sendToReceiveAsConsumerRecord(@KafkaKey String key, @MessageBody Book book)
void sendToReceiveAsListOfConsumerRecord(@KafkaKey String key, @MessageBody Book book)

@Topic(KafkaBatchListenerSpec.BOOK_CONSUMER_RECORDS_TOPIC)
void sendToReceiveAsConsumerRecords(@KafkaKey String key, @MessageBody Book book)

}

Expand Down Expand Up @@ -270,9 +293,15 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec {
}

@Topic(KafkaBatchListenerSpec.BOOK_CONSUMER_RECORD_LIST_TOPIC)
void receiveConsumerRecords(List<ConsumerRecord<String, Book>> books) {
this.keys.addAll(books.collect { it.key() })
this.books.addAll(books.collect { it.value() })
void receiveListOfConsumerRecord(List<ConsumerRecord<String, Book>> consumerRecords) {
this.keys.addAll(consumerRecords.collect { it.key() })
this.books.addAll(consumerRecords.collect { it.value() })
}

@Topic(KafkaBatchListenerSpec.BOOK_CONSUMER_RECORDS_TOPIC)
void receiveConsumerRecords(ConsumerRecords<String, Book> consumerRecords) {
this.keys.addAll(consumerRecords.collect { it.key() })
this.books.addAll(consumerRecords.collect { it.value() })
}
}

Expand Down

0 comments on commit 0e52c61

Please sign in to comment.