diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java index 81c9794b34..42f14be565 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java @@ -34,6 +34,7 @@ import org.apache.kafka.clients.admin.DescribeTopicsResult; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.TopicPartition; import org.springframework.beans.BeanUtils; @@ -167,6 +168,10 @@ protected AbstractMessageListenerContainer(ConsumerFactory if (this.containerProperties.getConsumerRebalanceListener() == null) { this.containerProperties.setConsumerRebalanceListener(createSimpleLoggingConsumerRebalanceListener()); } + final OffsetCommitCallback commitCallback = containerProperties.getCommitCallback(); + if (commitCallback != null) { + this.containerProperties.setCommitCallback(commitCallback, containerProperties.getOffsetAndMetadataProvider()); + } } @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java index 2de6d46317..02d1253934 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java @@ -22,6 +22,7 @@ import java.util.regex.Pattern; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.springframework.kafka.support.LogIfLevelEnabled; @@ -89,6 +90,11 @@ public class ConsumerProperties { */ private OffsetCommitCallback commitCallback; + /** + * A provider for {@link OffsetAndMetadata}. The provider allows customization of metadata. + */ + private OffsetAndMetadataProvider offsetAndMetadataProvider = (listenerMetadata, offset) -> new OffsetAndMetadata(offset); + /** * Whether or not to call consumer.commitSync() or commitAsync() when the * container is responsible for commits. Default true. @@ -275,6 +281,20 @@ public void setCommitCallback(OffsetCommitCallback commitCallback) { this.commitCallback = commitCallback; } + /** + * Set the commit callback and a metadata provider; by default a simple logging callback is used to log + * success at DEBUG level and failures at ERROR level. + * Used when {@link #setSyncCommits(boolean) syncCommits} is false. + * @param commitCallback the callback. + * @param offsetAndMetadataProvider an offset and metadata provider. + * @since 2.8.5 + * @see #setSyncCommits(boolean) + */ + public void setCommitCallback(OffsetCommitCallback commitCallback, OffsetAndMetadataProvider offsetAndMetadataProvider) { + this.commitCallback = commitCallback; + this.offsetAndMetadataProvider = offsetAndMetadataProvider; + } + /** * Return the commit callback. * @return the callback. @@ -284,6 +304,15 @@ public OffsetCommitCallback getCommitCallback() { return this.commitCallback; } + /** + * Return the offset and metadata provider. + * @return the offset and metadata provider. + */ + @Nullable + public OffsetAndMetadataProvider getOffsetAndMetadataProvider() { + return this.offsetAndMetadataProvider; + } + /** * Set whether or not to call consumer.commitSync() or commitAsync() when the * container is responsible for commits. Default true. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 7c2e783e96..009864d076 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -1438,7 +1438,7 @@ private void fixTxOffsetsIfNeeded() { return; } if (position > oamd.offset()) { - toFix.put(tp, new OffsetAndMetadata(position)); + toFix.put(tp, createOffsetAndMetadata(position)); } }); if (toFix.size() > 0) { @@ -1910,7 +1910,7 @@ else if (record.offset() < offs.get(0)) { private void ackImmediate(ConsumerRecord record) { Map commits = Collections.singletonMap( new TopicPartition(record.topic(), record.partition()), - new OffsetAndMetadata(record.offset() + 1)); + createOffsetAndMetadata(record.offset() + 1)); this.commitLogger.log(() -> COMMITTING + commits); if (this.producer != null) { doSendOffsets(this.producer, commits); @@ -1926,9 +1926,8 @@ else if (this.syncCommits) { private void ackImmediate(ConsumerRecords records) { Map commits = new HashMap<>(); for (TopicPartition part : records.partitions()) { - commits.put(part, - new OffsetAndMetadata(records.records(part) - .get(records.records(part).size() - 1).offset() + 1)); + commits.put(part, createOffsetAndMetadata(records.records(part) + .get(records.records(part).size() - 1).offset() + 1)); } this.commitLogger.log(() -> COMMITTING + commits); if (this.producer != null) { @@ -2694,7 +2693,7 @@ public void ackCurrent(final ConsumerRecord record) { if (this.isRecordAck) { Map offsetsToCommit = Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), - new OffsetAndMetadata(record.offset() + 1)); + createOffsetAndMetadata(record.offset() + 1)); if (this.producer == null) { this.commitLogger.log(() -> COMMITTING + offsetsToCommit); if (this.syncCommits) { @@ -2996,7 +2995,7 @@ private Map buildCommits() { for (Entry> entry : this.offsets.entrySet()) { for (Entry offset : entry.getValue().entrySet()) { commits.put(new TopicPartition(entry.getKey(), offset.getKey()), - new OffsetAndMetadata(offset.getValue() + 1)); + createOffsetAndMetadata(offset.getValue() + 1)); } } this.offsets.clear(); @@ -3079,6 +3078,26 @@ public String toString() { + "\n]"; } + private OffsetAndMetadata createOffsetAndMetadata(long offset) { + final OffsetAndMetadataProvider metadataProvider = this.containerProperties.getOffsetAndMetadataProvider(); + return metadataProvider == null + ? new OffsetAndMetadata(offset) + : metadataProvider.provide(new ConsumerAwareListenerMetadata(), offset); + } + + private final class ConsumerAwareListenerMetadata implements ListenerMetadata { + + @Override + public String getListenerId() { + return getBeanName(); + } + + @Override + public String getGroupId() { + return ListenerConsumer.this.consumerGroupId; + } + } + private final class ConsumerAcknowledgment implements Acknowledgment { private final ConsumerRecord record; @@ -3272,8 +3291,7 @@ private boolean collectAndCommitIfNecessary(Collection partition for (TopicPartition partition : partitions) { try { if (committed.get(partition) == null) { // no existing commit for this group - offsetsToCommit.put(partition, - new OffsetAndMetadata(ListenerConsumer.this.consumer.position(partition))); + offsetsToCommit.put(partition, createOffsetAndMetadata(ListenerConsumer.this.consumer.position(partition))); } } catch (NoOffsetForPartitionException e) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerMetadata.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerMetadata.java new file mode 100644 index 0000000000..9f93efb096 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerMetadata.java @@ -0,0 +1,39 @@ +/* + * Copyright 2016-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +/** + * Metadata associated to a {@link org.springframework.kafka.annotation.KafkaListener}. + * + * @author Francois Rosiere + * @since 2.8.5 + * @see org.springframework.kafka.annotation.KafkaListener + */ +public interface ListenerMetadata { + + /** + * Return the listener id. + * @return the listener id. + */ + String getListenerId(); + + /** + * Return the group id. + * @return the group id. + */ + String getGroupId(); +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/OffsetAndMetadataProvider.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/OffsetAndMetadataProvider.java new file mode 100644 index 0000000000..eb1f49148e --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/OffsetAndMetadataProvider.java @@ -0,0 +1,40 @@ +/* + * Copyright 2016-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; + +/** + * Provider for {@link OffsetAndMetadata}. In case of async commits of the offsets, + * the provider can be used in combination with an {@link org.apache.kafka.clients.consumer.OffsetCommitCallback} to + * have more granularity in the way to create an {@link OffsetAndMetadata}. + * + * @author Francois Rosiere + * @since 2.8.5 + * @see org.apache.kafka.clients.consumer.OffsetCommitCallback + */ +public interface OffsetAndMetadataProvider { + + /** + * Provide an offset and metadata object for the given listener metadata and offset. + * + * @param listenerMetadata metadata associated to a listener. + * @param offset an offset. + * @return an offset and metadata. + */ + OffsetAndMetadata provide(ListenerMetadata listenerMetadata, long offset); +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index 25f31120f3..1fe51f4e28 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -26,12 +26,16 @@ import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.willAnswer; import static org.mockito.BDDMockito.willThrow; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.time.Duration; import java.util.ArrayList; @@ -3844,6 +3848,54 @@ public void clearThreadState(Consumer consumer) { container.stop(); } + @Test + public void testOffsetAndMetadataWithoutProvider() throws InterruptedException { + testOffsetAndMetadata(null, new OffsetAndMetadata(1)); + } + + @Test + public void testOffsetAndMetadataWithProvider() throws InterruptedException { + testOffsetAndMetadata((listenerMetadata, offset) -> + new OffsetAndMetadata(offset, listenerMetadata.getGroupId()), + new OffsetAndMetadata(1, "grp")); + } + + @SuppressWarnings("unchecked") + private void testOffsetAndMetadata(OffsetAndMetadataProvider provider, + OffsetAndMetadata expectedOffsetAndMetadata) throws InterruptedException { + final ConsumerFactory cf = mock(ConsumerFactory.class); + final Consumer consumer = mock(Consumer.class); + given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer); + given(consumer.poll(any(Duration.class))).willAnswer(i -> new ConsumerRecords<>( + Map.of( + new TopicPartition("foo", 0), + Collections.singletonList(new ConsumerRecord<>("foo", 0, 0L, 1, "foo")) + ) + )); + final ArgumentCaptor> offsetsCaptor = ArgumentCaptor.forClass(Map.class); + final CountDownLatch latch = new CountDownLatch(1); + doAnswer(invocation -> { + latch.countDown(); + return null; + }).when(consumer).commitAsync(offsetsCaptor.capture(), any()); + final ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset("foo", 0)); + containerProps.setGroupId("grp"); + containerProps.setClientId("clientId"); + containerProps.setSyncCommits(false); + containerProps.setMessageListener((MessageListener) data -> { + }); + containerProps.setCommitCallback((offsets, exception) -> { + }, provider); + final KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(cf, containerProps); + container.start(); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(offsetsCaptor.getValue()) + .hasSize(1) + .containsValue(expectedOffsetAndMetadata); + container.stop(); + } + private Consumer spyOnConsumer(KafkaMessageListenerContainer container) { Consumer consumer = KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer", Consumer.class);