diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java index 121292d2b6..fce1564294 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java @@ -36,6 +36,7 @@ import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ErrorHandler; import org.springframework.kafka.listener.GenericErrorHandler; +import org.springframework.kafka.listener.RecordInterceptor; import org.springframework.kafka.listener.adapter.RecordFilterStrategy; import org.springframework.kafka.listener.adapter.ReplyHeadersConfigurer; import org.springframework.kafka.requestreply.ReplyingKafkaOperations; @@ -98,6 +99,8 @@ public abstract class AbstractKafkaListenerContainerFactory recordInterceptor; + /** * Specify a {@link ConsumerFactory} to use. * @param consumerFactory The consumer factory. @@ -280,6 +283,16 @@ public ContainerProperties getContainerProperties() { return this.containerProperties; } + /** + * Set an interceptor to be called before calling the listener. + * Does not apply to batch listeners. + * @param recordInterceptor the interceptor. + * @since 2.2.7 + */ + public void setRecordInterceptor(RecordInterceptor recordInterceptor) { + this.recordInterceptor = recordInterceptor; + } + @Override public void afterPropertiesSet() { if (this.errorHandler != null) { @@ -356,6 +369,7 @@ protected void initializeContainer(C instance, KafkaListenerEndpoint endpoint) { else if (this.autoStartup != null) { instance.setAutoStartup(this.autoStartup); } + instance.setRecordInterceptor(this.recordInterceptor); JavaUtils.INSTANCE .acceptIfNotNull(this.phase, instance::setPhase) .acceptIfNotNull(this.applicationEventPublisher, instance::setApplicationEventPublisher) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java index b328b18e3b..323374c6e4 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java @@ -89,6 +89,8 @@ public abstract class AbstractMessageListenerContainer private int topicCheckTimeout = DEFAULT_TOPIC_CHECK_TIMEOUT; + private RecordInterceptor recordInterceptor; + private volatile boolean running = false; private volatile boolean paused; @@ -279,6 +281,20 @@ public void setTopicCheckTimeout(int topicCheckTimeout) { this.topicCheckTimeout = topicCheckTimeout; } + protected RecordInterceptor getRecordInterceptor() { + return this.recordInterceptor; + } + + /** + * Set an interceptor to be called before calling the listener. + * Does not apply to batch listeners. + * @param recordInterceptor the interceptor. + * @since 2.2.7 + */ + public void setRecordInterceptor(RecordInterceptor recordInterceptor) { + this.recordInterceptor = recordInterceptor; + } + @Override public void setupMessageListener(Object messageListener) { this.containerProperties.setMessageListener(messageListener); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java index 80e7ab9076..47dc122f44 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java @@ -161,6 +161,7 @@ protected void doStart() { container.setClientIdSuffix("-" + i); container.setGenericErrorHandler(getGenericErrorHandler()); container.setAfterRollbackProcessor(getAfterRollbackProcessor()); + container.setRecordInterceptor(getRecordInterceptor()); container.setEmergencyStop(() -> { stop(() -> { // NOSONAR diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 9b0b0c99d9..0860e3d9a5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -466,6 +466,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final Duration syncCommitTimeout; + private final RecordInterceptor recordInterceptor = getRecordInterceptor(); + private Map definedPartitions; private volatile Collection assignedPartitions; @@ -1308,26 +1310,35 @@ private void invokeOnMessage(final ConsumerRecord record, ackCurrent(record, producer); } - private void doInvokeOnMessage(final ConsumerRecord record) { - switch (this.listenerType) { - case ACKNOWLEDGING_CONSUMER_AWARE: - this.listener.onMessage(record, - this.isAnyManualAck - ? new ConsumerAcknowledgment(record) - : null, this.consumer); - break; - case CONSUMER_AWARE: - this.listener.onMessage(record, this.consumer); - break; - case ACKNOWLEDGING: - this.listener.onMessage(record, - this.isAnyManualAck - ? new ConsumerAcknowledgment(record) - : null); - break; - case SIMPLE: - this.listener.onMessage(record); - break; + private void doInvokeOnMessage(final ConsumerRecord recordArg) { + ConsumerRecord record = recordArg; + if (this.recordInterceptor != null) { + record = this.recordInterceptor.intercept(record); + } + if (record == null) { + this.logger.debug(() -> "RecordInterceptor returned null, skipping: " + recordArg); + } + else { + switch (this.listenerType) { + case ACKNOWLEDGING_CONSUMER_AWARE: + this.listener.onMessage(record, + this.isAnyManualAck + ? new ConsumerAcknowledgment(record) + : null, this.consumer); + break; + case CONSUMER_AWARE: + this.listener.onMessage(record, this.consumer); + break; + case ACKNOWLEDGING: + this.listener.onMessage(record, + this.isAnyManualAck + ? new ConsumerAcknowledgment(record) + : null); + break; + case SIMPLE: + this.listener.onMessage(record); + break; + } } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInterceptor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInterceptor.java new file mode 100644 index 0000000000..cc06f886a1 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInterceptor.java @@ -0,0 +1,46 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import org.springframework.lang.Nullable; + +/** + * An interceptor for {@link ConsumerRecord} invoked by the listener + * container before invoking the listener. + * + * @param the key type. + * @param the value type. + * + * @author Gary Russell + * @since 2.2.7 + * + */ +@FunctionalInterface +public interface RecordInterceptor { + + /** + * Perform some action on the record or return a different one. + * If null is returned the record will be skipped. + * @param record the record. + * @return the record or null. + */ + @Nullable + ConsumerRecord intercept(ConsumerRecord record); + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index ce4f5ae7e6..dd35416685 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -746,6 +746,7 @@ public void testKeyConversion() throws Exception { this.bytesKeyTemplate.send("annotated36", "foo".getBytes(), "bar"); assertThat(this.listener.keyLatch.await(30, TimeUnit.SECONDS)).isTrue(); assertThat(this.listener.convertedKey).isEqualTo("foo"); + assertThat(this.config.intercepted).isTrue(); } @Test @@ -761,7 +762,11 @@ public void testProjection() throws InterruptedException { @EnableTransactionManagement(proxyTargetClass = true) public static class Config implements KafkaListenerConfigurer { - private final CountDownLatch spyLatch = new CountDownLatch(2); + final CountDownLatch spyLatch = new CountDownLatch(2); + + volatile Throwable globalErrorThrowable; + + volatile boolean intercepted; @Bean public static PropertySourcesPlaceholderConfigurer ppc() { @@ -784,8 +789,6 @@ public ChainedKafkaTransactionManager cktm() { return new ChainedKafkaTransactionManager<>(ktm(), transactionManager()); } - private Throwable globalErrorThrowable; - @Bean public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { @@ -884,6 +887,10 @@ public KafkaListenerContainerFactory bytesStringListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(bytesStringConsumerFactory()); + factory.setRecordInterceptor(record -> { + this.intercepted = true; + return record; + }); return factory; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java index af2bc14d87..73dfa8cbfe 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java @@ -124,11 +124,13 @@ protected KafkaConsumer createKafkaConsumer(String groupId, Str ContainerProperties containerProps = new ContainerProperties(topic1); containerProps.setLogContainerConfig(true); - final CountDownLatch latch = new CountDownLatch(4); + final CountDownLatch latch = new CountDownLatch(3); final Set listenerThreadNames = new ConcurrentSkipListSet<>(); + final List payloads = new ArrayList<>(); containerProps.setMessageListener((MessageListener) message -> { ConcurrentMessageListenerContainerTests.this.logger.info("auto: " + message); listenerThreadNames.add(Thread.currentThread().getName()); + payloads.add(message.value()); latch.countDown(); }); @@ -144,6 +146,11 @@ protected KafkaConsumer createKafkaConsumer(String groupId, Str stopLatch.countDown(); } }); + CountDownLatch intercepted = new CountDownLatch(4); + container.setRecordInterceptor(record -> { + intercepted.countDown(); + return record.value().equals("baz") ? null : record; + }); container.start(); ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); @@ -158,6 +165,7 @@ protected KafkaConsumer createKafkaConsumer(String groupId, Str template.sendDefault(0, "baz"); template.sendDefault(2, "qux"); template.flush(); + assertThat(intercepted.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); for (String threadName : listenerThreadNames) { assertThat(threadName).contains("-C-"); @@ -173,6 +181,7 @@ protected KafkaConsumer createKafkaConsumer(String groupId, Str Set> children = new HashSet<>(containers); container.stop(); assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(payloads).containsExactlyInAnyOrder("foo", "bar", "qux"); events.forEach(e -> { assertThat(e.getContainer(MessageListenerContainer.class)).isSameAs(container); if (e instanceof ContainerStoppedEvent) { diff --git a/src/reference/asciidoc/kafka.adoc b/src/reference/asciidoc/kafka.adoc index 7667543057..879393da21 100644 --- a/src/reference/asciidoc/kafka.adoc +++ b/src/reference/asciidoc/kafka.adoc @@ -753,6 +753,10 @@ Two `MessageListenerContainer` implementations are provided: The `KafkaMessageListenerContainer` receives all message from all topics or partitions on a single thread. The `ConcurrentMessageListenerContainer` delegates to one or more `KafkaMessageListenerContainer` instances to provide multi-threaded consumption. +Starting with version 2.1.7, you can add a `RecordInterceptor` to the listener container; it will be invoked before calling the listener allowing inspection or modification of the record. +If the interceptor returns null, the listener is not called. +The interceptor is not invoked when the listener is a <>. + [[kafka-container]] ====== Using `KafkaMessageListenerContainer` diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 8090ba127d..ee5b0f3bb7 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -34,6 +34,9 @@ See <> for more information. It is now possible to obtain the consumer's `group.id` property in the listener method. See <> for more information. +The container has a new property `recordInterceptor` allowing records to be inspected or modified before invoking the listener. +See <> for more information. + ==== ErrorHandler Changes The `SeekToCurrentErrorHandler` now treats certain exceptions as fatal and disables retry for those, invoking the recoverer on first failure.