Skip to content

Commit

Permalink
Support for ConsumerRecords container type on batch processing
Browse files Browse the repository at this point in the history
  • Loading branch information
rorueda committed Oct 25, 2023
1 parent cf11cdf commit 64de308
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ public BatchConsumerRecordsBinderRegistry(ConsumerRecordBinderRegistry consumerR
@Override
public <T> Optional<ArgumentBinder<T, ConsumerRecords<?, ?>>> findArgumentBinder(Argument<T> argument) {
Class<T> argType = argument.getType();

if (ConsumerRecords.class.equals(argType)) {
return Optional.of((context, consumerRecords) -> () -> (Optional<T>) Optional.of(consumerRecords));
}

if (Iterable.class.isAssignableFrom(argType) || argType.isArray() || Publishers.isConvertibleToPublisher(argType)) {
Argument<?> batchType = argument.getFirstTypeVariable().orElse(Argument.OBJECT_ARGUMENT);
List bound = new ArrayList();
Expand All @@ -74,7 +79,6 @@ public BatchConsumerRecordsBinderRegistry(ConsumerRecordBinderRegistry consumerR
if (result.isPresentAndSatisfied()) {
bound.add(result.get());
}

});
}
return () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,20 +518,25 @@ private static Optional<ConsumerRebalanceListener> getConsumerRebalanceListener(
return Optional.empty();
}

private static Argument<?> findBodyArgument(ExecutableMethod<?, ?> method) {
private static Optional<Argument<?>> findBodyArgument(ExecutableMethod<?, ?> method) {
return Arrays.stream(method.getArguments())
.filter(arg -> arg.getType() == ConsumerRecord.class || arg.getAnnotationMetadata().hasAnnotation(MessageBody.class))
.filter(arg -> isConsumerRecord(arg) || arg.getAnnotationMetadata().hasAnnotation(MessageBody.class))
.findFirst()
.orElseGet(() -> Arrays.stream(method.getArguments())
.or(() -> Arrays.stream(method.getArguments())
.filter(arg -> !arg.getAnnotationMetadata().hasStereotype(Bindable.class)
&& !isLastArgumentOfSuspendedMethod(arg, method))
.findFirst()
.orElse(null));
.findFirst());
}

private static Argument<?> findBodyArgument(boolean batch, ExecutableMethod<?, ?> method) {
final Argument<?> tempBodyArg = findBodyArgument(method);
return batch && tempBodyArg != null ? getComponentType(tempBodyArg) : tempBodyArg;
private static Optional<Argument<?>> findBodyArgument(boolean batch, ExecutableMethod<?, ?> method) {
return findBodyArgument(method)
.map(arg -> {
if (isConsumerRecord(arg)) {
return arg;
} else {
return batch ? getComponentType(arg) : arg;
}
});
}

private static boolean isLastArgumentOfSuspendedMethod(Argument<?> argument, ExecutableMethod<?, ?> method) {
Expand All @@ -544,20 +549,20 @@ private static boolean isLastArgumentOfSuspendedMethod(Argument<?> argument, Exe

private void configureDeserializers(final ExecutableMethod<?, ?> method, final DefaultKafkaConsumerConfiguration<?, ?> config) {
final boolean batch = method.isTrue(KafkaListener.class, "batch");
final Argument<?> bodyArgument = findBodyArgument(batch, method);
final Optional<Argument<?>> bodyArgument = findBodyArgument(batch, method);
configureKeyDeserializer(bodyArgument, method, config);
configureValueDeserializer(bodyArgument, config);
debugDeserializationConfiguration(method, config);
}

@SuppressWarnings({ "rawtypes", "unchecked" })
private void configureKeyDeserializer(Argument<?> bodyArgument, ExecutableMethod<?, ?> method, DefaultKafkaConsumerConfiguration config) {
private void configureKeyDeserializer(Optional<Argument<?>> bodyArgument, ExecutableMethod<?, ?> method, DefaultKafkaConsumerConfiguration config) {
if (!config.getConfig().containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) && config.getKeyDeserializer().isEmpty()) {
// figure out the Key deserializer
Arrays.stream(method.getArguments())
.filter(arg -> arg.isAnnotationPresent(KafkaKey.class))
.findFirst()
.or(() -> Optional.ofNullable(bodyArgument)
.or(() -> bodyArgument
.filter(KafkaConsumerProcessor::isConsumerRecord)
.flatMap(b -> b.getTypeVariable("K")))
.map(serdeRegistry::pickDeserializer)
Expand All @@ -567,25 +572,26 @@ private void configureKeyDeserializer(Argument<?> bodyArgument, ExecutableMethod
}

@SuppressWarnings({ "rawtypes", "unchecked" })
private void configureValueDeserializer(Argument<?> bodyArgument, DefaultKafkaConsumerConfiguration config) {
private void configureValueDeserializer(Optional<Argument<?>> bodyArgument, DefaultKafkaConsumerConfiguration config) {
if (!config.getConfig().containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) && config.getValueDeserializer().isEmpty()) {
// figure out the Value deserializer
final Optional<Argument<?>> body = Optional.ofNullable(bodyArgument);
body.filter(KafkaConsumerProcessor::isConsumerRecord)
bodyArgument.filter(KafkaConsumerProcessor::isConsumerRecord)
.flatMap(b -> b.getTypeVariable("V"))
.or(() -> body)
.or(() -> bodyArgument)
.map(serdeRegistry::pickDeserializer)
.ifPresentOrElse(config::setValueDeserializer,
() -> config.setValueDeserializer(DEFAULT_VALUE_DESERIALIZER));
}
}

private static boolean isConsumerRecord(@NonNull Argument<?> body) {
return ConsumerRecord.class.isAssignableFrom(body.getType());
return ConsumerRecord.class.isAssignableFrom(body.getType()) ||
ConsumerRecords.class.isAssignableFrom(body.getType());
}

private static Argument<?> getComponentType(final Argument<?> argument) {
final Class<?> argumentType = argument.getType();

return argumentType.isArray()
? Argument.of(argumentType.getComponentType())
: argument.getFirstTypeVariable().orElse(argument);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.micronaut.messaging.annotation.MessageHeader
import io.micronaut.messaging.annotation.SendTo
import io.micronaut.serde.annotation.Serdeable
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import reactor.core.publisher.Flux
import spock.lang.Retry

Expand All @@ -20,7 +21,8 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec {

public static final String BOOKS_TOPIC = 'KafkaBatchListenerSpec-books'
public static final String BOOKS_LIST_TOPIC = 'KafkaBatchListenerSpec-books-list'
public static final String BOOK_CONSUMER_RECORD_LIST_TOPIC = 'KafkaBatchListenerSpec-consumer-records'
public static final String BOOK_CONSUMER_RECORD_LIST_TOPIC = 'KafkaBatchListenerSpec-consumer-record-list'
public static final String BOOK_CONSUMER_RECORDS_TOPIC = 'KafkaBatchListenerSpec-consumer-records'
public static final String BOOKS_HEADERS_TOPIC = 'KafkaBatchListenerSpec-books-headers'
public static final String BOOKS_FLUX_TOPIC = 'KafkaBatchListenerSpec-books-flux'
public static final String BOOKS_FORWARD_LIST_TOPIC = 'KafkaBatchListenerSpec-books-forward-list'
Expand Down Expand Up @@ -169,16 +171,34 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec {
}
}

void "test keys and values deserialized to the correct type when receiving a batch of ConsumerRecord"() {
void "test keys and values deserialized to the correct type when receiving a batch with a list of ConsumerRecord"() {
given:
MyBatchClient myBatchClient = context.getBean(MyBatchClient)
BookListener bookListener = context.getBean(BookListener)
bookListener.books?.clear()
bookListener.keys?.clear()

when:
myBatchClient.sendToReceiveAsConsumerRecord("book-1", new Book(title: "The Flowable"))
myBatchClient.sendToReceiveAsConsumerRecord("book-2", new Book(title: "The Shining"))
myBatchClient.sendToReceiveAsListOfConsumerRecord("book-1", new Book(title: "The Flowable"))
myBatchClient.sendToReceiveAsListOfConsumerRecord("book-2", new Book(title: "The Shining"))

then:
conditions.eventually {
bookListener.books == [new Book(title: "The Flowable"), new Book(title: "The Shining")]
bookListener.keys == ["book-1", "book-2"]
}
}

void "test keys and values deserialized to the correct type when receiving a batch of ConsumerRecords"() {
given:
MyBatchClient myBatchClient = context.getBean(MyBatchClient)
BookListener bookListener = context.getBean(BookListener)
bookListener.books?.clear()
bookListener.keys?.clear()

when:
myBatchClient.sendToReceiveAsConsumerRecords("book-1", new Book(title: "The Flowable"))
myBatchClient.sendToReceiveAsConsumerRecords("book-2", new Book(title: "The Shining"))

then:
conditions.eventually {
Expand Down Expand Up @@ -215,7 +235,10 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec {
void sendBooksFlux(Flux<Book> books)

@Topic(KafkaBatchListenerSpec.BOOK_CONSUMER_RECORD_LIST_TOPIC)
void sendToReceiveAsConsumerRecord(@KafkaKey String key, @MessageBody Book book)
void sendToReceiveAsListOfConsumerRecord(@KafkaKey String key, @MessageBody Book book)

@Topic(KafkaBatchListenerSpec.BOOK_CONSUMER_RECORDS_TOPIC)
void sendToReceiveAsConsumerRecords(@KafkaKey String key, @MessageBody Book book)

}

Expand Down Expand Up @@ -270,9 +293,15 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec {
}

@Topic(KafkaBatchListenerSpec.BOOK_CONSUMER_RECORD_LIST_TOPIC)
void receiveConsumerRecords(List<ConsumerRecord<String, Book>> books) {
this.keys.addAll(books.collect { it.key() })
this.books.addAll(books.collect { it.value() })
void receiveListOfConsumerRecord(List<ConsumerRecord<String, Book>> consumerRecords) {
this.keys.addAll(consumerRecords.collect { it.key() })
this.books.addAll(consumerRecords.collect { it.value() })
}

@Topic(KafkaBatchListenerSpec.BOOK_CONSUMER_RECORDS_TOPIC)
void receiveConsumerRecords(ConsumerRecords<String, Book> consumerRecords) {
this.keys.addAll(consumerRecords.collect { it.key() })
this.books.addAll(consumerRecords.collect { it.value() })
}
}

Expand Down

0 comments on commit 64de308

Please sign in to comment.