Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1509,6 +1509,9 @@ public KafkaListenerContainerFactory<?> batchFactory() {
NOTE: Starting with version 2.8, you can override the factory's `batchListener` propery using the `batch` property on the `@KafkaListener` annotation.
This, together with the changes to <<error-handlers>> allows the same factory to be used for both record and batch listeners.

NOTE: Starting with version 2.9.6, the container factory has separate setters for the `recordMessageConverter` and `batchMessageConverter` properties.
Previously, there was only one property `messageConverter` which applied to both record and batch listeners.

The following example shows how to receive a list of payloads:

====
Expand Down Expand Up @@ -4395,7 +4398,7 @@ public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new JsonMessageConverter());
factory.setRecordMessageConverter(new JsonMessageConverter());
return factory;
}
...
Expand Down Expand Up @@ -4677,7 +4680,7 @@ public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setMessageConverter(new BatchMessagingMessageConverter(converter()));
factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
return factory;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2022 the original author or authors.
* Copyright 2014-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -47,7 +47,9 @@
import org.springframework.kafka.requestreply.ReplyingKafkaOperations;
import org.springframework.kafka.support.JavaUtils;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.util.Assert;

/**
Expand Down Expand Up @@ -82,7 +84,9 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe

private Integer phase;

private MessageConverter messageConverter;
private RecordMessageConverter recordMessageConverter;

private BatchMessageConverter batchMessageConverter;

private RecordFilterStrategy<? super K, ? super V> recordFilterStrategy;

Expand Down Expand Up @@ -154,9 +158,38 @@ public void setPhase(int phase) {
/**
* Set the message converter to use if dynamic argument type matching is needed.
* @param messageConverter the converter.
* @deprecated since 2.9.6 in favor of
* {@link #setBatchMessageConverter(BatchMessageConverter)} and
* {@link #setRecordMessageConverter(RecordMessageConverter)}.
*/
@Deprecated
public void setMessageConverter(MessageConverter messageConverter) {
this.messageConverter = messageConverter;
if (messageConverter instanceof RecordMessageConverter) {
setRecordMessageConverter((RecordMessageConverter) messageConverter);
}
else {
setBatchMessageConverter((BatchMessageConverter) messageConverter);
}
}

/**
* Set the message converter to use if dynamic argument type matching is needed for
* record listeners.
* @param recordMessageConverter the converter.
* @since 2.9.6
*/
public void setRecordMessageConverter(RecordMessageConverter recordMessageConverter) {
this.recordMessageConverter = recordMessageConverter;
}

/**
* Set the message converter to use if dynamic argument type matching is needed for
* batch listeners.
* @param batchMessageConverter the converter.
* @since 2.9.6
*/
public void setBatchMessageConverter(BatchMessageConverter batchMessageConverter) {
this.batchMessageConverter = batchMessageConverter;
}

/**
Expand Down Expand Up @@ -391,7 +424,12 @@ public C createListenerContainer(KafkaListenerEndpoint endpoint) {
configureEndpoint((AbstractKafkaListenerEndpoint<K, V>) endpoint);
}

endpoint.setupListenerContainer(instance, this.messageConverter);
if (Boolean.TRUE.equals(endpoint.getBatchListener())) {
endpoint.setupListenerContainer(instance, this.batchMessageConverter);
}
else {
endpoint.setupListenerContainer(instance, this.recordMessageConverter);
}
initializeContainer(instance, endpoint);
customizeContainer(instance);
return instance;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2022 the original author or authors.
* Copyright 2014-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -282,6 +282,7 @@ public boolean isBatchListener() {
* @return the batch listener flag.
* @since 2.8
*/
@Override
@Nullable
public Boolean getBatchListener() {
return this.batchListener;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -165,4 +165,15 @@ default String getMainListenerId() {
return null;
}

/**
* Return the current batch listener flag for this endpoint, or null if not explicitly
* set.
* @return the batch listener flag.
* @since 2.9.6
*/
@Nullable
default Boolean getBatchListener() {
return null;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2022 the original author or authors.
* Copyright 2017-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -173,7 +173,7 @@ public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(EmbeddedKa
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(embeddedKafka));
factory.setBatchListener(true);
factory.setMessageConverter(new BatchMessagingMessageConverter(converter()));
factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
factory.setReplyTemplate(template(embeddedKafka));
DefaultErrorHandler eh = new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template));
factory.setCommonErrorHandler(eh);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2022 the original author or authors.
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1091,7 +1091,7 @@ public void handleRecord(Exception thrownException, ConsumerRecord<?, ?> record,
}
});
factory.getContainerProperties().setMicrometerTags(Collections.singletonMap("extraTag", "foo"));
factory.setMessageConverter(new RecordMessageConverter() {
factory.setRecordMessageConverter(new RecordMessageConverter() {

@Override
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
Expand Down Expand Up @@ -1137,7 +1137,7 @@ public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
typeMapper.addTrustedPackages("*");
converter.setTypeMapper(typeMapper);
factory.setMessageConverter(converter);
factory.setRecordMessageConverter(converter);
return factory;
}

Expand All @@ -1154,7 +1154,7 @@ public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory2() {
typeMapper.addTrustedPackages("*");
typeMapper.setTypePrecedence(TypePrecedence.TYPE_ID);
converter.setTypeMapper(typeMapper);
factory.setMessageConverter(converter);
factory.setRecordMessageConverter(converter);
return factory;
}

Expand All @@ -1167,7 +1167,7 @@ public KafkaListenerContainerFactory<?> projectionListenerContainerFactory() {
DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
typeMapper.addTrustedPackages("*");
converter.setTypeMapper(typeMapper);
factory.setMessageConverter(new ProjectingMessageConverter(converter));
factory.setRecordMessageConverter(new ProjectingMessageConverter(converter));
factory.setChangeConsumerThreadName(true);
factory.setThreadNameSupplier(container -> "foo." + container.getListenerId());
return factory;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 the original author or authors.
* Copyright 2021-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -165,7 +165,7 @@ public ConsumerFactory consumerFactory() {
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new BatchMessagingMessageConverter(new JsonMessageConverter()));
factory.setBatchMessageConverter(new BatchMessagingMessageConverter(new JsonMessageConverter()));
factory.setBatchListener(true);
return factory;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 the original author or authors.
* Copyright 2018-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -900,7 +900,7 @@ public ConcurrentKafkaListenerContainerFactory<Integer, String> simpleMapperFact
factory.setReplyTemplate(template());
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
messageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
factory.setMessageConverter(messageConverter);
factory.setRecordMessageConverter(messageConverter);
factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {

@Override
Expand Down