Skip to content

Commit

Permalink
Support to infer deserializers correctly batch listeners recieving Co…
Browse files Browse the repository at this point in the history
…nsumerRecords

It looks like when using a batch listener with a parameter of List<ConsumerRecord<K,V>> the framework picks a List deserializer instead of looking at the ConsumerRecord type args. With this change, it will look at the type args of the ConsumerRecord to determine the kafka message deserializer
  • Loading branch information
dhofftgt committed Nov 22, 2022
1 parent bb65c65 commit bcee29b
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,11 @@ private static Argument<?> findBodyArgument(ExecutableMethod<?, ?> method) {
private void configureDeserializers(final ExecutableMethod<?, ?> method, final DefaultKafkaConsumerConfiguration consumerConfiguration) {
final Properties properties = consumerConfiguration.getConfig();
// figure out the Key deserializer
final Argument<?> bodyArgument = findBodyArgument(method);
boolean batch = method.isTrue(KafkaListener.class, "batch");

Argument<?> tempBodyArg = findBodyArgument(method);

final Argument<?> bodyArgument = batch && tempBodyArg != null ? getComponentType(tempBodyArg) : tempBodyArg;

if (!properties.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) && !consumerConfiguration.getKeyDeserializer().isPresent()) {
final Optional<Argument<?>> keyArgument = Arrays.stream(method.getArguments())
Expand Down Expand Up @@ -956,8 +960,7 @@ private void configureDeserializers(final ExecutableMethod<?, ?> method, final D
consumerConfiguration.setValueDeserializer(new StringDeserializer());
}
} else {
final boolean batch = method.isTrue(KafkaListener.class, "batch");
consumerConfiguration.setValueDeserializer(serdeRegistry.pickDeserializer(batch ? getComponentType(bodyArgument) : bodyArgument));
consumerConfiguration.setValueDeserializer(serdeRegistry.pickDeserializer(bodyArgument));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import groovy.transform.EqualsAndHashCode
import groovy.transform.ToString
import io.micronaut.configuration.kafka.AbstractKafkaContainerSpec
import io.micronaut.context.annotation.Requires
import io.micronaut.messaging.annotation.MessageBody
import io.micronaut.messaging.annotation.MessageHeader
import io.micronaut.messaging.annotation.SendTo
import io.micronaut.serde.annotation.Serdeable
import io.reactivex.Flowable
import org.apache.kafka.clients.consumer.ConsumerRecord
import reactor.core.publisher.Flux
import spock.lang.Retry

Expand All @@ -19,6 +21,8 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec {

public static final String BOOKS_TOPIC = 'KafkaBatchListenerSpec-books'
public static final String BOOKS_LIST_TOPIC = 'KafkaBatchListenerSpec-books-list'
public static final String BOOK_CONSUMER_RECORD_LIST_TOPIC = 'KafkaBatchListenerSpec-consumer-records'
public static final String BOOKS_AND_KEYS_LIST_TOPIC = 'KafkaBatchListenerSpec-book-and-keys-list'
public static final String BOOKS_HEADERS_TOPIC = 'KafkaBatchListenerSpec-books-headers'
public static final String BOOKS_FLUX_TOPIC = 'KafkaBatchListenerSpec-books-flux'
public static final String BOOKS_FLOWABLE_TOPIC = 'KafkaBatchListenerSpec-books-flowable'
Expand Down Expand Up @@ -204,6 +208,24 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec {
}
}

void "test keys and values deserialized to the correct type when receiving a batch of ConsumerRecord"() {
given:
MyBatchClient myBatchClient = context.getBean(MyBatchClient)
BookListener bookListener = context.getBean(BookListener)
bookListener.books?.clear()
bookListener.keys?.clear()

when:
myBatchClient.sendToReceiveAsConsumerRecord("book-1", new Book(title: "The Flowable"))
myBatchClient.sendToReceiveAsConsumerRecord("book-2", new Book(title: "The Shining"))

then:
conditions.eventually {
bookListener.books == [new Book(title: "The Flowable"), new Book(title: "The Shining")]
bookListener.keys == ["book-1", "book-2"]
}
}

@Requires(property = 'spec.name', value = 'KafkaBatchListenerSpec')
@KafkaClient(batch = true)
@Topic(KafkaBatchListenerSpec.BOOKS_TOPIC)
Expand Down Expand Up @@ -236,13 +258,18 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec {

@Topic(KafkaBatchListenerSpec.BOOKS_FLOWABLE_TOPIC)
void sendBooksFlowable(Flowable<Book> books)

@Topic(KafkaBatchListenerSpec.BOOK_CONSUMER_RECORD_LIST_TOPIC)
void sendToReceiveAsConsumerRecord(@KafkaKey String key, @MessageBody Book book)

}

@Requires(property = 'spec.name', value = 'KafkaBatchListenerSpec')
@KafkaListener(batch = true, offsetReset = EARLIEST)
@Topic(KafkaBatchListenerSpec.BOOKS_TOPIC)
static class BookListener {
List<Book> books = []
List<String> keys = []
List<String> headers = []

@Topic(KafkaBatchListenerSpec.BOOKS_LIST_TOPIC)
Expand Down Expand Up @@ -298,6 +325,12 @@ class KafkaBatchListenerSpec extends AbstractKafkaContainerSpec {
void receiveFlowable(Flowable<Book> books) {
this.books.addAll books.toList().blockingGet()
}

@Topic(KafkaBatchListenerSpec.BOOK_CONSUMER_RECORD_LIST_TOPIC)
void receiveConsumerRecords(List<ConsumerRecord<String, Book>> books) {
this.keys.addAll(books.collect { it.key() })
this.books.addAll(books.collect { it.value() })
}
}

@Requires(property = 'spec.name', value = 'KafkaBatchListenerSpec')
Expand Down

0 comments on commit bcee29b

Please sign in to comment.