diff --git a/spring-kafka-docs/src/main/kotlin/org/springframework/kafka/kdocs/dynamic/Application.kt b/spring-kafka-docs/src/main/kotlin/org/springframework/kafka/kdocs/dynamic/Application.kt index 51960cbc30..12b96430af 100644 --- a/spring-kafka-docs/src/main/kotlin/org/springframework/kafka/kdocs/dynamic/Application.kt +++ b/spring-kafka-docs/src/main/kotlin/org/springframework/kafka/kdocs/dynamic/Application.kt @@ -1,5 +1,5 @@ /* - * Copyright 2022-2024 the original author or authors. + * Copyright 2022-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -60,8 +60,8 @@ private fun createContainer( factory: ConcurrentKafkaListenerContainerFactory, topic: String, group: String ): ConcurrentMessageListenerContainer { val container = factory.createContainer(topic) - container.containerProperties.messageListener = MyListener() - container.containerProperties.groupId = group + container.containerProperties.setMessageListener(MyListener()) + container.containerProperties.setGroupId(group) container.beanName = group container.start() return container @@ -104,9 +104,10 @@ fun pojo(id: String, topic: String): MyPojo { // tag::listener[] -class MyListener : MessageListener { +class MyListener : MessageListener { + + override fun onMessage(data: ConsumerRecord) { - override fun onMessage(data: ConsumerRecord) { // ... } diff --git a/spring-kafka-docs/src/main/kotlin/org/springframework/kafka/kdocs/requestreply/Application.kt b/spring-kafka-docs/src/main/kotlin/org/springframework/kafka/kdocs/requestreply/Application.kt index d9dfa497d8..33553e97e2 100644 --- a/spring-kafka-docs/src/main/kotlin/org/springframework/kafka/kdocs/requestreply/Application.kt +++ b/spring-kafka-docs/src/main/kotlin/org/springframework/kafka/kdocs/requestreply/Application.kt @@ -74,7 +74,7 @@ class Application { factory: ConcurrentKafkaListenerContainerFactory ): ReplyingKafkaTemplate { val replyContainer = factory.createContainer("replies") - replyContainer.containerProperties.groupId = "request.replies" + replyContainer.containerProperties.setGroupId("request.replies") val template = ReplyingKafkaTemplate(pf, replyContainer) template.messageConverter = ByteArrayJsonMessageConverter() template.setDefaultTopic("requests") diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java index 2f81325693..5b8759ceae 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java @@ -321,6 +321,7 @@ interface ProducerCallback { */ interface OperationsCallback { + @Nullable T doInOperations(KafkaOperations operations); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index 11c047f269..78a6c315d6 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -660,7 +660,7 @@ public T execute(ProducerCallback callback) { } @Override - public T executeInTransaction(OperationsCallback callback) { + public @Nullable T executeInTransaction(OperationsCallback callback) { Assert.notNull(callback, "'callback' cannot be null"); Assert.state(this.transactional, "Producer factory does not support transactions"); Thread currentThread = Thread.currentThread(); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/event/ConsumerStoppingEvent.java b/spring-kafka/src/main/java/org/springframework/kafka/event/ConsumerStoppingEvent.java index 067e7347eb..a05e2fbb92 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/event/ConsumerStoppingEvent.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/event/ConsumerStoppingEvent.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; +import org.jspecify.annotations.Nullable; /** * An event published when a consumer is stopped. While it is best practice to use @@ -37,7 +38,7 @@ public class ConsumerStoppingEvent extends KafkaEvent { private transient final Consumer consumer; - private transient final Collection partitions; + private transient final @Nullable Collection partitions; /** * Construct an instance with the provided source, consumer and partitions. @@ -48,7 +49,7 @@ public class ConsumerStoppingEvent extends KafkaEvent { * @since 2.2.1 */ public ConsumerStoppingEvent(Object source, Object container, - Consumer consumer, Collection partitions) { + Consumer consumer, @Nullable Collection partitions) { super(source, container); this.consumer = consumer; this.partitions = partitions; @@ -58,7 +59,7 @@ public ConsumerStoppingEvent(Object source, Object container, return this.consumer; } - public Collection getPartitions() { + public @Nullable Collection getPartitions() { return this.partitions; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractConsumerSeekAware.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractConsumerSeekAware.java index 00def6c050..278d06852a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractConsumerSeekAware.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractConsumerSeekAware.java @@ -65,21 +65,23 @@ public void onPartitionsAssigned(Map assignments, Consumer } @Override - public void onPartitionsRevoked(Collection partitions) { - partitions.forEach(tp -> { - List removedCallbacks = this.topicToCallbacks.remove(tp); - if (removedCallbacks != null && !removedCallbacks.isEmpty()) { - removedCallbacks.forEach(cb -> { - List topics = this.callbackToTopics.get(cb); - if (topics != null) { - topics.remove(tp); - if (topics.isEmpty()) { - this.callbackToTopics.remove(cb); + public void onPartitionsRevoked(@Nullable Collection partitions) { + if (partitions != null) { + partitions.forEach(tp -> { + List removedCallbacks = this.topicToCallbacks.remove(tp); + if (removedCallbacks != null && !removedCallbacks.isEmpty()) { + removedCallbacks.forEach(cb -> { + List topics = this.callbackToTopics.get(cb); + if (topics != null) { + topics.remove(tp); + if (topics.isEmpty()) { + this.callbackToTopics.remove(cb); + } } - } - }); - } - }); + }); + } + }); + } } @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractKafkaBackOffManagerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractKafkaBackOffManagerFactory.java index c3f53491b3..03281cb8ec 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractKafkaBackOffManagerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractKafkaBackOffManagerFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,10 @@ package org.springframework.kafka.listener; +import java.util.Objects; + +import org.jspecify.annotations.Nullable; + import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.kafka.config.KafkaListenerConfigUtils; @@ -32,9 +36,9 @@ public abstract class AbstractKafkaBackOffManagerFactory implements KafkaBackOffManagerFactory, ApplicationContextAware { - private ApplicationContext applicationContext; + private @Nullable ApplicationContext applicationContext; - private ListenerContainerRegistry listenerContainerRegistry; + private @Nullable ListenerContainerRegistry listenerContainerRegistry; /** * Creates an instance that will retrieve the {@link ListenerContainerRegistry} from @@ -83,7 +87,7 @@ private ListenerContainerRegistry getListenerContainerFromContext() { } protected T getBean(String beanName, Class beanClass) { - return this.applicationContext.getBean(beanName, beanClass); + return Objects.requireNonNull(this.applicationContext).getBean(beanName, beanClass); } @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java index 1d8083e121..b4b3dc8d4e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java @@ -101,9 +101,9 @@ public abstract class AbstractMessageListenerContainer @NonNull private String beanName = "noBeanNameSet"; - private ApplicationEventPublisher applicationEventPublisher; + private @Nullable ApplicationEventPublisher applicationEventPublisher; - private CommonErrorHandler commonErrorHandler; + private @Nullable CommonErrorHandler commonErrorHandler; private boolean autoStartup = true; @@ -114,15 +114,16 @@ public abstract class AbstractMessageListenerContainer private int topicCheckTimeout = DEFAULT_TOPIC_CHECK_TIMEOUT; - private RecordInterceptor recordInterceptor; + private @Nullable RecordInterceptor recordInterceptor; - private BatchInterceptor batchInterceptor; + private @Nullable BatchInterceptor batchInterceptor; private boolean interceptBeforeTx = true; + @SuppressWarnings("NullAway.Init") private byte[] listenerInfo; - private ApplicationContext applicationContext; + private @Nullable ApplicationContext applicationContext; private volatile boolean running = false; @@ -149,13 +150,13 @@ public abstract class AbstractMessageListenerContainer * @param containerProperties the properties. */ @SuppressWarnings("unchecked") - protected AbstractMessageListenerContainer(ConsumerFactory consumerFactory, + protected AbstractMessageListenerContainer(@Nullable ConsumerFactory consumerFactory, ContainerProperties containerProperties) { Assert.notNull(containerProperties, "'containerProperties' cannot be null"); Assert.notNull(consumerFactory, "'consumerFactory' cannot be null"); this.consumerFactory = (ConsumerFactory) consumerFactory; - String[] topics = containerProperties.getTopics(); + @Nullable String @Nullable [] topics = containerProperties.getTopics(); if (topics != null) { this.containerProperties = new ContainerProperties(topics); } @@ -165,7 +166,7 @@ protected AbstractMessageListenerContainer(ConsumerFactory this.containerProperties = new ContainerProperties(topicPattern); } else { - TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions(); + @Nullable TopicPartitionOffset @Nullable [] topicPartitions = containerProperties.getTopicPartitions(); if (topicPartitions != null) { this.containerProperties = new ContainerProperties(topicPartitions); } @@ -370,8 +371,8 @@ public String getMainListenerId() { return this.mainListenerId; } - @Nullable @Override + @SuppressWarnings("NullAway") // Dataflow analysis limitation public byte[] getListenerInfo() { return this.listenerInfo != null ? Arrays.copyOf(this.listenerInfo, this.listenerInfo.length) : null; } @@ -382,6 +383,7 @@ public byte[] getListenerInfo() { * @param listenerInfo the info. * @since 2.8.4 */ + @SuppressWarnings("NullAway") // Dataflow analysis limitation public void setListenerInfo(@Nullable byte[] listenerInfo) { this.listenerInfo = listenerInfo != null ? Arrays.copyOf(listenerInfo, listenerInfo.length) : null; } @@ -458,7 +460,7 @@ public void setKafkaAdmin(KafkaAdmin kafkaAdmin) { this.kafkaAdmin = kafkaAdmin; } - protected RecordInterceptor getRecordInterceptor() { + protected @Nullable RecordInterceptor getRecordInterceptor() { return this.recordInterceptor; } @@ -469,11 +471,11 @@ protected RecordInterceptor getRecordInterceptor() { * @since 2.2.7 * @see #setInterceptBeforeTx(boolean) */ - public void setRecordInterceptor(RecordInterceptor recordInterceptor) { + public void setRecordInterceptor(@Nullable RecordInterceptor recordInterceptor) { this.recordInterceptor = recordInterceptor; } - protected BatchInterceptor getBatchInterceptor() { + protected @Nullable BatchInterceptor getBatchInterceptor() { return this.batchInterceptor; } @@ -483,7 +485,7 @@ protected BatchInterceptor getBatchInterceptor() { * @since 2.6.6 * @see #setInterceptBeforeTx(boolean) */ - public void setBatchInterceptor(BatchInterceptor batchInterceptor) { + public void setBatchInterceptor(@Nullable BatchInterceptor batchInterceptor) { this.batchInterceptor = batchInterceptor; } @@ -541,7 +543,7 @@ protected void checkTopics() { List missing = null; try (AdminClient client = AdminClient.create(configs)) { // NOSONAR - false positive null check if (client != null) { - String[] topics = this.containerProperties.getTopics(); + @Nullable String @Nullable[] topics = this.containerProperties.getTopics(); if (topics == null) { topics = Arrays.stream(this.containerProperties.getTopicPartitions()) .map(TopicPartitionOffset::getTopic) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AcknowledgingConsumerAwareMessageListener.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AcknowledgingConsumerAwareMessageListener.java index 1749a8252e..7a60309b96 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AcknowledgingConsumerAwareMessageListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AcknowledgingConsumerAwareMessageListener.java @@ -47,6 +47,6 @@ default void onMessage(ConsumerRecord data) { } @Override - void onMessage(ConsumerRecord data, @Nullable Acknowledgment acknowledgment, Consumer consumer); + void onMessage(ConsumerRecord data, @Nullable Acknowledgment acknowledgment, @Nullable Consumer consumer); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/BackOffHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/BackOffHandler.java index d6104626ad..5381260a8d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/BackOffHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/BackOffHandler.java @@ -35,7 +35,7 @@ public interface BackOffHandler { * @param exception the exception. * @param nextBackOff the next back off. */ - default void onNextBackOff(@Nullable MessageListenerContainer container, Exception exception, long nextBackOff) { + default void onNextBackOff(@Nullable MessageListenerContainer container, @Nullable Exception exception, long nextBackOff) { throw new UnsupportedOperationException(); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchAcknowledgingConsumerAwareMessageListener.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchAcknowledgingConsumerAwareMessageListener.java index 63e4c2cc6b..9e1f788f3d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchAcknowledgingConsumerAwareMessageListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchAcknowledgingConsumerAwareMessageListener.java @@ -51,6 +51,6 @@ default void onMessage(List> data) { } @Override - void onMessage(List> data, @Nullable Acknowledgment acknowledgment, Consumer consumer); + void onMessage(List> data, @Nullable Acknowledgment acknowledgment, @Nullable Consumer consumer); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchAcknowledgingMessageListener.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchAcknowledgingMessageListener.java index 09474ab0ae..2c5b223034 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchAcknowledgingMessageListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchAcknowledgingMessageListener.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2019 the original author or authors. + * Copyright 2015-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ import java.util.List; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.jspecify.annotations.Nullable; import org.springframework.kafka.support.Acknowledgment; @@ -49,6 +50,6 @@ default void onMessage(List> data) { } @Override - void onMessage(List> data, Acknowledgment acknowledgment); + void onMessage(List> data, @Nullable Acknowledgment acknowledgment); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchConsumerAwareMessageListener.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchConsumerAwareMessageListener.java index a52f320e33..3568eb6708 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchConsumerAwareMessageListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchConsumerAwareMessageListener.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.jspecify.annotations.Nullable; /** * Listener for handling a batch of incoming Kafka messages; the list @@ -47,6 +48,6 @@ default void onMessage(List> data) { } @Override - void onMessage(List> data, Consumer consumer); + void onMessage(List> data, @Nullable Consumer consumer); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchListenerFailedException.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchListenerFailedException.java index f8c331c430..589e543876 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchListenerFailedException.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/BatchListenerFailedException.java @@ -36,7 +36,7 @@ public class BatchListenerFailedException extends KafkaException { private final int index; - private transient ConsumerRecord record; + private transient @Nullable ConsumerRecord record; /** * Construct an instance with the provided properties. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeBatchInterceptor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeBatchInterceptor.java index 488fd70a0b..1fa48006d6 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeBatchInterceptor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeBatchInterceptor.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2021 the original author or authors. + * Copyright 2019-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.jspecify.annotations.Nullable; import org.springframework.util.Assert; @@ -53,7 +54,7 @@ public CompositeBatchInterceptor(BatchInterceptor... delegates) { } @Override - public ConsumerRecords intercept(ConsumerRecords records, Consumer consumer) { + public @Nullable ConsumerRecords intercept(ConsumerRecords records, Consumer consumer) { ConsumerRecords recordsToIntercept = records; for (BatchInterceptor delegate : this.delegates) { recordsToIntercept = delegate.intercept(recordsToIntercept, consumer); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java index 6b1bb23cd8..5d75f93db6 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java @@ -73,7 +73,7 @@ public class ConcurrentMessageListenerContainer extends AbstractMessageLis private boolean alwaysClientIdSuffix = true; - private volatile Reason reason; + private volatile @Nullable Reason reason; /** * Construct an instance with the supplied configuration properties. @@ -82,7 +82,7 @@ public class ConcurrentMessageListenerContainer extends AbstractMessageLis * @param consumerFactory the consumer factory. * @param containerProperties the container properties. */ - public ConcurrentMessageListenerContainer(ConsumerFactory consumerFactory, + public ConcurrentMessageListenerContainer(@Nullable ConsumerFactory consumerFactory, ContainerProperties containerProperties) { super(consumerFactory, containerProperties); @@ -244,7 +244,7 @@ protected void doStart() { if (!isRunning()) { checkTopics(); ContainerProperties containerProperties = getContainerProperties(); - TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions(); + @Nullable TopicPartitionOffset @Nullable [] topicPartitions = containerProperties.getTopicPartitions(); if (topicPartitions != null && this.concurrency > topicPartitions.length) { this.logger.warn(() -> "When specific partitions are provided, the concurrency must be less than or " + "equal to the number of partitions; reduced from " + this.concurrency + " to " @@ -302,7 +302,7 @@ private void configureChildContainer(int index, KafkaMessageListenerContainer constructContainer(ContainerProperties containerProperties, - @Nullable TopicPartitionOffset[] topicPartitions, int i) { + @Nullable TopicPartitionOffset @Nullable [] topicPartitions, int i) { KafkaMessageListenerContainer container; if (topicPartitions == null) { @@ -315,9 +315,8 @@ private KafkaMessageListenerContainer constructContainer(ContainerProperti return container; } - @Nullable - private TopicPartitionOffset[] partitionSubset(ContainerProperties containerProperties, int index) { - TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions(); + private @Nullable TopicPartitionOffset @Nullable [] partitionSubset(ContainerProperties containerProperties, int index) { + @Nullable TopicPartitionOffset @Nullable [] topicPartitions = containerProperties.getTopicPartitions(); if (topicPartitions == null) { return null; } @@ -434,7 +433,8 @@ public void childStopped(MessageListenerContainer child, Reason reason) { } } - private void publishConcurrentContainerStoppedEvent(Reason reason) { + @SuppressWarnings("NullAway") // Dataflow analysis limitation + private void publishConcurrentContainerStoppedEvent(@Nullable Reason reason) { ApplicationEventPublisher eventPublisher = getApplicationEventPublisher(); if (eventPublisher != null) { eventPublisher.publishEvent(new ConcurrentContainerStoppedEvent(this, reason)); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareListenerErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareListenerErrorHandler.java index d016d04a69..80265dd6eb 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareListenerErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareListenerErrorHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2022 the original author or authors. + * Copyright 2017-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.springframework.kafka.listener; import org.apache.kafka.clients.consumer.Consumer; +import org.jspecify.annotations.Nullable; import org.springframework.messaging.Message; @@ -38,6 +39,6 @@ default Object handleError(Message message, ListenerExecutionFailedException } @Override - Object handleError(Message message, ListenerExecutionFailedException exception, Consumer consumer); + Object handleError(Message message, ListenerExecutionFailedException exception, @Nullable Consumer consumer); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareMessageListener.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareMessageListener.java index 3be310a7d6..dd14f0d802 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareMessageListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareMessageListener.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.jspecify.annotations.Nullable; /** * Listener for handling individual incoming Kafka messages. @@ -43,6 +44,6 @@ default void onMessage(ConsumerRecord data) { } @Override - void onMessage(ConsumerRecord data, Consumer consumer); + void onMessage(ConsumerRecord data, @Nullable Consumer consumer); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareRecordRecoverer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareRecordRecoverer.java index c69972ca67..6daa7e5ac0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareRecordRecoverer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareRecordRecoverer.java @@ -43,6 +43,6 @@ default void accept(ConsumerRecord record, Exception exception) { * @param exception the exception. * @since 2.7 */ - void accept(ConsumerRecord record, @Nullable Consumer consumer, Exception exception); + void accept(ConsumerRecord record, @Nullable Consumer consumer, @Nullable Exception exception); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java index 74e995b3a6..a06e739c8f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java @@ -51,17 +51,17 @@ public class ConsumerProperties { /** * Topic names. */ - private final String[] topics; + private final @Nullable String @Nullable [] topics; /** * Topic pattern. */ - private final Pattern topicPattern; + private final @Nullable Pattern topicPattern; /** * Topics/partitions/initial offsets. */ - private final TopicPartitionOffset[] topicPartitions; + private final @Nullable TopicPartitionOffset @Nullable [] topicPartitions; /** * The max time to block in the consumer waiting for records. @@ -71,7 +71,7 @@ public class ConsumerProperties { /** * Override the group id. */ - private String groupId; + private @Nullable String groupId; /** * Override the client id. @@ -81,21 +81,21 @@ public class ConsumerProperties { /** * A user defined {@link ConsumerRebalanceListener} implementation. */ - private ConsumerRebalanceListener consumerRebalanceListener; + private @Nullable ConsumerRebalanceListener consumerRebalanceListener; - private Duration syncCommitTimeout; + private @Nullable Duration syncCommitTimeout; /** * The commit callback; by default a simple logging callback is used to log * success at DEBUG level and failures at ERROR level. */ - private OffsetCommitCallback commitCallback; + private @Nullable OffsetCommitCallback commitCallback; /** * A provider for {@link OffsetAndMetadata}; by default, the provider creates an offset and metadata with * empty metadata. The provider gives a way to customize the metadata. */ - private OffsetAndMetadataProvider offsetAndMetadataProvider; + private @Nullable OffsetAndMetadataProvider offsetAndMetadataProvider; /** * Whether or not to call consumer.commitSync() or commitAsync() when the @@ -107,7 +107,7 @@ public class ConsumerProperties { private Properties kafkaConsumerProperties = new Properties(); - private Duration authExceptionRetryInterval; + private @Nullable Duration authExceptionRetryInterval; private int commitRetries = DEFAULT_COMMIT_RETRIES; @@ -137,7 +137,7 @@ public ConsumerProperties(String... topics) { * @param topicPattern the pattern. * @see org.apache.kafka.clients.CommonClientConfigs#METADATA_MAX_AGE_CONFIG */ - public ConsumerProperties(Pattern topicPattern) { + public ConsumerProperties(@Nullable Pattern topicPattern) { this.topics = null; this.topicPattern = topicPattern; this.topicPartitions = null; @@ -160,7 +160,7 @@ public ConsumerProperties(TopicPartitionOffset... topicPartitions) { * @return the topics. */ @Nullable - public String[] getTopics() { + public String @Nullable [] getTopics() { return this.topics != null ? Arrays.copyOf(this.topics, this.topics.length) : null; @@ -181,7 +181,7 @@ public Pattern getTopicPattern() { * @since 2.5 */ @Nullable - public TopicPartitionOffset[] getTopicPartitions() { + public TopicPartitionOffset @Nullable [] getTopicPartitions() { return this.topicPartitions != null ? Arrays.copyOf(this.topicPartitions, this.topicPartitions.length) : null; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java index 4ad123bd1b..c9608aa5e0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java @@ -60,7 +60,7 @@ default void onPartitionsAssigned(Map assignments, Consume * @param partitions the partitions that have been revoked. * @since 2.3 */ - default void onPartitionsRevoked(Collection partitions) { + default void onPartitionsRevoked(@Nullable Collection partitions) { } /** diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerGroupSequencer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerGroupSequencer.java index 1e02ebf844..88f1e9446f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerGroupSequencer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerGroupSequencer.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2021-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,8 +19,10 @@ import java.util.Collection; import java.util.Iterator; import java.util.LinkedHashSet; +import java.util.Objects; import org.apache.commons.logging.LogFactory; +import org.jspecify.annotations.Nullable; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; @@ -56,13 +58,13 @@ public class ContainerGroupSequencer implements ApplicationContextAware, private final TaskExecutor executor = new SimpleAsyncTaskExecutor("container-group-sequencer-"); - private ApplicationContext applicationContext; + private @Nullable ApplicationContext applicationContext; private boolean stopLastGroupWhenIdle; - private Iterator iterator; + private @Nullable Iterator iterator; - private ContainerGroup currentGroup; + private @Nullable ContainerGroup currentGroup; private boolean autoStartup = true; @@ -136,7 +138,7 @@ public synchronized void onApplicationEvent(ListenerContainerIdleEvent event) { MessageListenerContainer parent = event.getContainer(MessageListenerContainer.class); MessageListenerContainer container = (MessageListenerContainer) event.getSource(); boolean inCurrentGroup = this.currentGroup != null && this.currentGroup.contains(parent); - if (this.running && inCurrentGroup && (this.iterator.hasNext() || this.stopLastGroupWhenIdle)) { + if (this.running && inCurrentGroup && (Objects.requireNonNull(this.iterator).hasNext() || this.stopLastGroupWhenIdle)) { this.executor.execute(() -> { LOGGER.debug(() -> "Stopping: " + container); container.stop(() -> { @@ -157,9 +159,16 @@ private synchronized void stopParentAndCheckGroup(MessageListenerContainer paren LOGGER.debug(() -> "Stopping: " + parent); parent.stop(() -> { if (this.currentGroup != null) { - LOGGER.debug(() -> "Checking group: " + this.currentGroup.toString()); + LOGGER.debug(() -> { + if (this.currentGroup != null) { + return "Checking group: " + this.currentGroup.toString(); + } + else { + return "Current group is null"; + } + }); if (this.currentGroup.allStopped()) { - if (this.iterator.hasNext()) { + if (Objects.requireNonNull(this.iterator).hasNext()) { this.currentGroup = this.iterator.next(); LOGGER.debug(() -> "Starting next group: " + this.currentGroup); this.currentGroup.start(); @@ -185,7 +194,7 @@ public synchronized void start() { public void initialize() { this.groups.clear(); for (String group : this.groupNames) { - this.groups.add(this.applicationContext.getBean(group + ".group", ContainerGroup.class)); + this.groups.add(Objects.requireNonNull(this.applicationContext).getBean(group + ".group", ContainerGroup.class)); } if (!this.groups.isEmpty()) { this.iterator = this.groups.iterator(); @@ -194,7 +203,7 @@ public void initialize() { Collection ids = grp.getListenerIds(); ids.stream().forEach(id -> { MessageListenerContainer container = this.registry.getListenerContainer(id); - if (container.getContainerProperties().getIdleEventInterval() == null) { + if (Objects.requireNonNull(container).getContainerProperties().getIdleEventInterval() == null) { container.getContainerProperties().setIdleEventInterval(this.defaultIdleEventInterval); container.setAutoStartup(false); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPartitionPausingBackOffManagerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPartitionPausingBackOffManagerFactory.java index 7d850ae56a..774c99f474 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPartitionPausingBackOffManagerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPartitionPausingBackOffManagerFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 the original author or authors. + * Copyright 2022-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package org.springframework.kafka.listener; +import org.jspecify.annotations.Nullable; + import org.springframework.context.ApplicationContext; import org.springframework.util.Assert; @@ -28,7 +30,7 @@ */ public class ContainerPartitionPausingBackOffManagerFactory extends AbstractKafkaBackOffManagerFactory { - private BackOffHandler backOffHandler; + private @Nullable BackOffHandler backOffHandler; /** * Construct an instance with the provided properties. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPausingBackOffHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPausingBackOffHandler.java index 935b7f7c3b..1c7ebc6266 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPausingBackOffHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPausingBackOffHandler.java @@ -43,7 +43,7 @@ public ContainerPausingBackOffHandler(ListenerContainerPauseService pauser) { } @Override - public void onNextBackOff(@Nullable MessageListenerContainer container, Exception exception, long nextBackOff) { + public void onNextBackOff(@Nullable MessageListenerContainer container, @Nullable Exception exception, long nextBackOff) { if (container == null) { this.defaultBackOffHandler.onNextBackOff(container, exception, nextBackOff); // NOSONAR } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java index c1d20507a8..7f11a3e328 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java @@ -239,12 +239,12 @@ public enum EOSMode { * The message listener; must be a {@link org.springframework.kafka.listener.MessageListener} * or {@link org.springframework.kafka.listener.AcknowledgingMessageListener}. */ - private Object messageListener; + private @Nullable Object messageListener; /** * The executor for threads that poll the consumer. */ - private AsyncTaskExecutor listenerTaskExecutor; + private @Nullable AsyncTaskExecutor listenerTaskExecutor; /** * The timeout for shutting down the container. This is the maximum amount of @@ -253,22 +253,22 @@ public enum EOSMode { */ private long shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT; - private Long idleEventInterval; + private @Nullable Long idleEventInterval; - private Long idlePartitionEventInterval; + private @Nullable Long idlePartitionEventInterval; private double idleBeforeDataMultiplier = DEFAULT_IDLE_BEFORE_DATA_MULTIPLIER; @Deprecated(since = "3.2") - private PlatformTransactionManager transactionManager; + private @Nullable PlatformTransactionManager transactionManager; - private KafkaAwareTransactionManager kafkaAwareTransactionManager; + private @Nullable KafkaAwareTransactionManager kafkaAwareTransactionManager; private boolean batchRecoverAfterRollback = false; private int monitorInterval = DEFAULT_MONITOR_INTERVAL; - private TaskScheduler scheduler; + private @Nullable TaskScheduler scheduler; private float noPollThreshold = DEFAULT_NO_POLL_THRESHOLD; @@ -286,7 +286,7 @@ public enum EOSMode { private Duration consumerStartTimeout = DEFAULT_CONSUMER_START_TIMEOUT; - private Boolean subBatchPerPartition; + private @Nullable Boolean subBatchPerPartition; private AssignmentCommitOption assignmentCommitOption = AssignmentCommitOption.LATEST_ONLY_NO_TX; @@ -294,7 +294,7 @@ public enum EOSMode { private EOSMode eosMode = EOSMode.V2; - private TransactionDefinition transactionDefinition; + private @Nullable TransactionDefinition transactionDefinition; private boolean stopContainerWhenFenced; @@ -304,7 +304,7 @@ public enum EOSMode { private boolean pauseImmediate; - private KafkaListenerObservationConvention observationConvention; + private @Nullable KafkaListenerObservationConvention observationConvention; private Duration pollTimeoutWhilePaused = DEFAULT_PAUSED_POLL_TIMEOUT; @@ -327,7 +327,7 @@ public ContainerProperties(String... topics) { * @param topicPattern the pattern. * @see org.apache.kafka.clients.CommonClientConfigs#METADATA_MAX_AGE_CONFIG */ - public ContainerProperties(Pattern topicPattern) { + public ContainerProperties(@Nullable Pattern topicPattern) { super(topicPattern); } @@ -486,7 +486,7 @@ public long getAckTime() { return this.ackTime; } - public Object getMessageListener() { + public @Nullable Object getMessageListener() { return this.messageListener; } @@ -1028,14 +1028,16 @@ private void adviseListenerIfNeeded() { this.adviceChain.forEach(advised::addAdvice); } else { - ProxyFactory pf = new ProxyFactory(this.messageListener); - this.adviceChain.forEach(pf::addAdvice); - this.messageListener = pf.getProxy(); + if (this.messageListener != null) { + ProxyFactory pf = new ProxyFactory(this.messageListener); + this.adviceChain.forEach(pf::addAdvice); + this.messageListener = pf.getProxy(); + } } } } - public KafkaListenerObservationConvention getObservationConvention() { + public @Nullable KafkaListenerObservationConvention getObservationConvention() { return this.observationConvention; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java index 4a1747c1e2..72870491c1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java @@ -25,6 +25,7 @@ import java.util.EnumSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -87,11 +88,11 @@ public class DeadLetterPublishingRecoverer extends ExceptionClassifier implement private final BiFunction, Exception, TopicPartition> destinationResolver; - private final Function, KafkaOperations> templateResolver; + private final Function, @Nullable KafkaOperations> templateResolver; private final EnumSet whichHeaders = EnumSet.allOf(HeaderNames.HeadersToAdd.class); - private HeaderNames headerNames = getHeaderNames(); + private @Nullable HeaderNames headerNames = getHeaderNames(); private boolean retainExceptionHeader; @@ -174,7 +175,7 @@ public DeadLetterPublishingRecoverer(KafkaOperations, KafkaOperations> templates) { + public DeadLetterPublishingRecoverer(Map, @Nullable KafkaOperations> templates) { this(templates, DEFAULT_DESTINATION_RESOLVER); } @@ -192,7 +193,7 @@ public DeadLetterPublishingRecoverer(Map, KafkaOperations, KafkaOperations> templates, + public DeadLetterPublishingRecoverer(Map, @Nullable KafkaOperations> templates, BiFunction, Exception, TopicPartition> destinationResolver) { Assert.isTrue(!ObjectUtils.isEmpty(templates), "At least one template is required"); @@ -205,7 +206,7 @@ public DeadLetterPublishingRecoverer(Map, KafkaOperations t.isTransactional()) + .map(t -> Objects.requireNonNull(t).isTransactional()) .allMatch(t -> t.equals(tx)), "All templates must have the same setting for transactional"); this.destinationResolver = destinationResolver; } @@ -222,7 +223,7 @@ public DeadLetterPublishingRecoverer(Map, KafkaOperations, KafkaOperations> templateResolver, + public DeadLetterPublishingRecoverer(Function, @Nullable KafkaOperations> templateResolver, BiFunction, Exception, TopicPartition> destinationResolver) { this(templateResolver, false, destinationResolver); } @@ -240,7 +241,7 @@ public DeadLetterPublishingRecoverer(Function, KafkaOperati * @param destinationResolver the resolving function. * @since 2.7 */ - public DeadLetterPublishingRecoverer(Function, KafkaOperations> templateResolver, + public DeadLetterPublishingRecoverer(Function, @Nullable KafkaOperations> templateResolver, boolean transactional, BiFunction, Exception, TopicPartition> destinationResolver) { @@ -498,7 +499,7 @@ public void addHeadersFunction(BiFunction, Exception, Heade } } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "NullAway"}) @Override public void accept(ConsumerRecord record, @Nullable Consumer consumer, Exception exception) { TopicPartition tp = this.destinationResolver.apply(record, exception); @@ -626,8 +627,8 @@ private TopicPartition checkPartition(TopicPartition tp, Consumer consumer } @SuppressWarnings("unchecked") - private KafkaOperations findTemplateForValue(@Nullable Object value, - Map, KafkaOperations> templates) { + private @Nullable KafkaOperations findTemplateForValue(@Nullable Object value, + Map, @Nullable KafkaOperations> templates) { if (value == null) { KafkaOperations operations = templates.get(Void.class); @@ -667,6 +668,7 @@ private KafkaOperations findTemplateForValue(@Nullable Object va * @return the producer record to send. * @see KafkaHeaders */ + @SuppressWarnings("NullAway") // Dataflow analysis limitation protected ProducerRecord createProducerRecord(ConsumerRecord record, TopicPartition topicPartition, Headers headers, @Nullable byte[] key, @Nullable byte[] value) { @@ -778,7 +780,7 @@ private void enhanceHeaders(Headers kafkaHeaders, ConsumerRecord record, E } private void maybeAddOriginalHeaders(Headers kafkaHeaders, ConsumerRecord record, Exception ex) { - maybeAddHeader(kafkaHeaders, this.headerNames.original.topicHeader, + maybeAddHeader(kafkaHeaders, Objects.requireNonNull(this.headerNames).original.topicHeader, () -> record.topic().getBytes(StandardCharsets.UTF_8), HeaderNames.HeadersToAdd.TOPIC); maybeAddHeader(kafkaHeaders, this.headerNames.original.partitionHeader, () -> ByteBuffer.allocate(Integer.BYTES).putInt(record.partition()).array(), @@ -837,13 +839,13 @@ private void addExceptionInfoHeaders(Headers kafkaHeaders, Exception exception, } @Nullable - private String buildMessage(Exception exception, Throwable cause) { + private String buildMessage(Exception exception, @Nullable Throwable cause) { String message = exception.getMessage(); if (!exception.equals(cause)) { if (message != null) { message = message + "; "; } - String causeMsg = cause.getMessage(); + String causeMsg = Objects.requireNonNull(cause).getMessage(); if (causeMsg != null) { if (message != null) { message = message + causeMsg; @@ -1195,17 +1197,17 @@ public static Builder.Original original() { */ public class Original { - private String offsetHeader; + private @Nullable String offsetHeader; - private String timestampHeader; + private @Nullable String timestampHeader; - private String timestampTypeHeader; + private @Nullable String timestampTypeHeader; - private String topicHeader; + private @Nullable String topicHeader; - private String partitionHeader; + private @Nullable String partitionHeader; - private String consumerGroupHeader; + private @Nullable String consumerGroupHeader; /** * Sets the name of the header that will be used to store the offset @@ -1317,19 +1319,19 @@ private DeadLetterPublishingRecoverer.HeaderNames.Original build() { */ public class ExceptionInfo { - private String keyExceptionFqcn; + private @Nullable String keyExceptionFqcn; - private String exceptionFqcn; + private @Nullable String exceptionFqcn; - private String exceptionCauseFqcn; + private @Nullable String exceptionCauseFqcn; - private String keyExceptionMessage; + private @Nullable String keyExceptionMessage; - private String exceptionMessage; + private @Nullable String exceptionMessage; - private String keyExceptionStacktrace; + private @Nullable String keyExceptionStacktrace; - private String exceptionStacktrace; + private @Nullable String exceptionStacktrace; /** * Sets the name of the header that will be used to store the keyExceptionFqcn diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java index 68ceb31b5f..a08a6987bc 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiConsumer; @@ -65,7 +66,7 @@ public class DefaultAfterRollbackProcessor extends FailedRecordProcessor private final BackOff backOff; - private final KafkaOperations kafkaTemplate; + private final @Nullable KafkaOperations kafkaTemplate; private final BiConsumer, Exception> recoverer; @@ -169,7 +170,7 @@ public void process(List> records, Consumer consumer, if (SeekUtils.doSeeks((List) records, consumer, exception, recoverable, getFailureTracker(), container, this.logger) - && isCommitRecovered() && this.kafkaTemplate.isTransactional()) { + && isCommitRecovered() && Objects.requireNonNull(this.kafkaTemplate).isTransactional()) { ConsumerRecord skipped = records.get(0); this.kafkaTemplate.sendOffsetsToTransaction( Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()), diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultBackOffHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultBackOffHandler.java index a626eec502..3d3dad7d8f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultBackOffHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultBackOffHandler.java @@ -30,7 +30,7 @@ public class DefaultBackOffHandler implements BackOffHandler { @Override - public void onNextBackOff(@Nullable MessageListenerContainer container, Exception exception, long nextBackOff) { + public void onNextBackOff(@Nullable MessageListenerContainer container, @Nullable Exception exception, long nextBackOff) { try { if (container == null) { Thread.sleep(nextBackOff); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListener.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListener.java index b35fbb371d..1410ab7647 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListener.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2024 the original author or authors. + * Copyright 2021-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.header.internals.RecordHeader; +import org.jspecify.annotations.Nullable; import org.springframework.kafka.support.KafkaHeaders; @@ -37,7 +38,7 @@ public class DeliveryAttemptAwareRetryListener implements RetryListener { @Override - public void failedDelivery(ConsumerRecord record, Exception ex, int deliveryAttempt) { + public void failedDelivery(ConsumerRecord record, @Nullable Exception ex, int deliveryAttempt) { // Pass } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java index 343a074a9d..76cc94181c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java @@ -19,6 +19,7 @@ import java.time.Duration; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.Set; import java.util.function.BiConsumer; @@ -152,7 +153,8 @@ public static void retryBatch(Exception thrownException, ConsumerRecords r logger.debug(ex, () -> "Retry failed for: " + toLog); recoveryException = ex; Exception newException = unwrapIfNeeded(ex); - if (reClassifyOnExceptionChange && !newException.getClass().equals(lastException.getClass()) + if (reClassifyOnExceptionChange && !Objects.requireNonNull(newException).getClass() + .equals(Objects.requireNonNull(lastException).getClass()) && !classifier.classify(newException)) { break; @@ -204,7 +206,7 @@ public static String recordsToString(ConsumerRecords records) { * @return the unwrapped cause or cause of cause. * @since 2.8.11 */ - public static Exception unwrapIfNeeded(Exception exception) { + public static @Nullable Exception unwrapIfNeeded(@Nullable Exception exception) { Exception theEx = exception; if (theEx instanceof TimestampedException && theEx.getCause() instanceof Exception cause) { theEx = cause; @@ -222,7 +224,7 @@ public static Exception unwrapIfNeeded(Exception exception) { * @return the root cause. * @since 3.0.7 */ - public static Exception findRootCause(Exception exception) { + public static @Nullable Exception findRootCause(@Nullable Exception exception) { Exception realException = exception; while ((realException instanceof ListenerExecutionFailedException || realException instanceof TimestampedException) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java index aedcebc93d..2e2ea5d82e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java @@ -41,7 +41,7 @@ public abstract class FailedRecordProcessor extends ExceptionClassifier implemen private static final BackOff NO_RETRIES_OR_DELAY_BACKOFF = new FixedBackOff(0L, 0L); - private final BiFunction, Exception, BackOff> noRetriesForClassified = + private final BiFunction, @Nullable Exception, BackOff> noRetriesForClassified = (rec, ex) -> { Exception theEx = ErrorHandlingUtils.unwrapIfNeeded(ex); if (!getClassifier().classify(theEx) || theEx instanceof KafkaBackoffException) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java index a6120377ad..f22cfd3eaa 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; @@ -57,7 +58,7 @@ class FailedRecordTracker implements RecoveryStrategy { private final BackOff backOff; - private BiFunction, Exception, BackOff> backOffFunction; + private @Nullable BiFunction, @Nullable Exception, BackOff> backOffFunction; private final BackOffHandler backOffHandler; @@ -71,6 +72,7 @@ class FailedRecordTracker implements RecoveryStrategy { this(recoverer, backOff, null, logger); } + @SuppressWarnings("NullAway") // Dataflow analysis limitation FailedRecordTracker(@Nullable BiConsumer, Exception> recoverer, BackOff backOff, @Nullable BackOffHandler backOffHandler, LogAccessor logger) { @@ -111,7 +113,7 @@ class FailedRecordTracker implements RecoveryStrategy { * @param backOffFunction the function. * @since 2.6 */ - public void setBackOffFunction(@Nullable BiFunction, Exception, BackOff> backOffFunction) { + public void setBackOffFunction(@Nullable BiFunction, @Nullable Exception, BackOff> backOffFunction) { this.backOffFunction = backOffFunction; } @@ -165,7 +167,7 @@ boolean skip(ConsumerRecord record, Exception exception) { } @Override - public boolean recovered(ConsumerRecord record, Exception exception, + public boolean recovered(ConsumerRecord record, @Nullable Exception exception, @Nullable MessageListenerContainer container, @Nullable Consumer consumer) throws InterruptedException { @@ -194,14 +196,14 @@ public boolean recovered(ConsumerRecord record, Exception exception, } } - private FailedRecord getFailedRecordInstance(ConsumerRecord record, Exception exception, + private FailedRecord getFailedRecordInstance(ConsumerRecord record, @Nullable Exception exception, Map map, TopicPartition topicPartition) { Exception realException = ErrorHandlingUtils.findRootCause(exception); FailedRecord failedRecord = map.get(topicPartition); if (failedRecord == null || failedRecord.getOffset() != record.offset() || (this.resetStateOnExceptionChange - && !realException.getClass().isInstance(failedRecord.getLastException()))) { + && !Objects.requireNonNull(realException).getClass().isInstance(failedRecord.getLastException()))) { failedRecord = new FailedRecord(record.offset(), determineBackOff(record, realException).start()); map.put(topicPartition, failedRecord); @@ -213,7 +215,7 @@ private FailedRecord getFailedRecordInstance(ConsumerRecord record, Except return failedRecord; } - private BackOff determineBackOff(ConsumerRecord record, Exception exception) { + private BackOff determineBackOff(ConsumerRecord record, @Nullable Exception exception) { if (this.backOffFunction == null) { return this.backOff; } @@ -221,8 +223,8 @@ private BackOff determineBackOff(ConsumerRecord record, Exception exceptio return backOffToUse != null ? backOffToUse : this.backOff; } - private void attemptRecovery(ConsumerRecord record, Exception exception, @Nullable TopicPartition tp, - Consumer consumer) { + private void attemptRecovery(ConsumerRecord record, @Nullable Exception exception, @Nullable TopicPartition tp, + @Nullable Consumer consumer) { try { this.recoverer.accept(record, consumer, exception); @@ -231,7 +233,10 @@ private void attemptRecovery(ConsumerRecord record, Exception exception, @ catch (RuntimeException e) { this.retryListeners.forEach(rl -> rl.recoveryFailed(record, exception, e)); if (tp != null && this.resetStateOnRecoveryFailure) { - this.failures.get(Thread.currentThread()).remove(tp); + Map topicPartitionFailedRecordMap = this.failures.get(Thread.currentThread()); + if (topicPartitionFailedRecordMap != null) { + topicPartitionFailedRecordMap.remove(tp); + } } throw e; } @@ -257,7 +262,11 @@ int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) { return 1; } FailedRecord failedRecord = map.get(topicPartitionOffset.getTopicPartition()); - if (failedRecord == null || failedRecord.getOffset() != topicPartitionOffset.getOffset()) { + if (failedRecord == null) { + return 1; + } + Long offsetValue = topicPartitionOffset.getOffset(); + if (offsetValue != null && failedRecord.getOffset() != offsetValue) { return 1; } return failedRecord.getDeliveryAttempts().get() + 1; @@ -271,7 +280,7 @@ static final class FailedRecord { private final AtomicInteger deliveryAttempts = new AtomicInteger(1); - private Exception lastException; + private @Nullable Exception lastException; FailedRecord(long offset, BackOffExecution backOffExecution) { this.offset = offset; @@ -290,11 +299,12 @@ AtomicInteger getDeliveryAttempts() { return this.deliveryAttempts; } + @Nullable Exception getLastException() { return this.lastException; } - void setLastException(Exception lastException) { + void setLastException(@Nullable Exception lastException) { this.lastException = lastException; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaConsumerBackoffManager.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaConsumerBackoffManager.java index 9b8becc8b3..6f973e1422 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaConsumerBackoffManager.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaConsumerBackoffManager.java @@ -62,7 +62,7 @@ class Context { /** * The consumer of the message, if present. */ - private final Consumer consumerForTimingAdjustment; + private final @Nullable Consumer consumerForTimingAdjustment; Context(long dueTimestamp, TopicPartition topicPartition, String listenerId, @Nullable Consumer consumerForTimingAdjustment) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaListenerErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaListenerErrorHandler.java index 3a56e00b04..094adf41ea 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaListenerErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaListenerErrorHandler.java @@ -57,7 +57,7 @@ public interface KafkaListenerErrorHandler { * {@code @SendTo} annotation. */ default Object handleError(Message message, ListenerExecutionFailedException exception, - Consumer consumer) { + @Nullable Consumer consumer) { return handleError(message, exception); } @@ -74,7 +74,7 @@ default Object handleError(Message message, ListenerExecutionFailedException */ @Nullable default Object handleError(Message message, ListenerExecutionFailedException exception, - Consumer consumer, @Nullable Acknowledgment ack) { + @Nullable Consumer consumer, @Nullable Acknowledgment ack) { return handleError(message, exception, consumer); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 87222dfea0..e8e4c14884 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -187,15 +187,17 @@ public class KafkaMessageListenerContainer // NOSONAR line count private final AbstractMessageListenerContainer thisOrParentContainer; - private final TopicPartitionOffset[] topicPartitions; + private final @Nullable TopicPartitionOffset @Nullable [] topicPartitions; - private String clientIdSuffix; + private @Nullable String clientIdSuffix; private Runnable emergencyStop = () -> stopAbnormally(() -> { }); + @SuppressWarnings("NullAway.Init") private volatile ListenerConsumer listenerConsumer; + @SuppressWarnings("NullAway.Init") private volatile CompletableFuture listenerConsumerFuture; private volatile CountDownLatch startLatch = new CountDownLatch(1); @@ -234,7 +236,7 @@ public KafkaMessageListenerContainer(ConsumerFactory consu */ KafkaMessageListenerContainer(@Nullable AbstractMessageListenerContainer container, ConsumerFactory consumerFactory, - ContainerProperties containerProperties, @Nullable TopicPartitionOffset... topicPartitions) { + ContainerProperties containerProperties, @Nullable TopicPartitionOffset @Nullable ... topicPartitions) { super(consumerFactory, containerProperties); Assert.notNull(consumerFactory, "A ConsumerFactory must be provided"); @@ -373,6 +375,7 @@ protected void doStart() { containerProperties.setListenerTaskExecutor(consumerExecutor); } GenericMessageListener listener = (GenericMessageListener) messageListener; + Assert.state(listener != null, "'messageListener' cannot be null"); ListenerType listenerType = determineListenerType(listener); ObservationRegistry observationRegistry = containerProperties.getObservationRegistry(); if (observationRegistry.isNoop()) { @@ -635,11 +638,11 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final GenericMessageListener genericListener; - private final ConsumerSeekAware consumerSeekAwareListener; + private final @Nullable ConsumerSeekAware consumerSeekAwareListener; - private final MessageListener listener; + private final @Nullable MessageListener listener; - private final BatchMessageListener batchListener; + private final @Nullable BatchMessageListener batchListener; private final ListenerType listenerType; @@ -675,22 +678,22 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final BlockingQueue seeks = new LinkedBlockingQueue<>(); - private final CommonErrorHandler commonErrorHandler; + private final @Nullable CommonErrorHandler commonErrorHandler; @Deprecated(since = "3.2", forRemoval = true) @SuppressWarnings("removal") - private final PlatformTransactionManager transactionManager = + private final @Nullable PlatformTransactionManager transactionManager = this.containerProperties.getKafkaAwareTransactionManager() != null ? this.containerProperties.getKafkaAwareTransactionManager() : this.containerProperties.getTransactionManager(); - private final KafkaAwareTransactionManager kafkaTxManager = + private final @Nullable KafkaAwareTransactionManager kafkaTxManager = this.transactionManager instanceof KafkaAwareTransactionManager kafkaAwareTransactionManager ? kafkaAwareTransactionManager : null; - private final TransactionTemplate transactionTemplate; + private final @Nullable TransactionTemplate transactionTemplate; - private final String consumerGroupId = KafkaMessageListenerContainer.this.getGroupId(); + private final @Nullable String consumerGroupId = KafkaMessageListenerContainer.this.getGroupId(); private final TaskScheduler taskScheduler; @@ -709,39 +712,39 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final boolean syncCommits = this.containerProperties.isSyncCommits(); - private final Duration syncCommitTimeout; + private final @Nullable Duration syncCommitTimeout; - private final RecordInterceptor recordInterceptor = + private final @Nullable RecordInterceptor recordInterceptor = !isInterceptBeforeTx() || this.transactionManager == null ? getRecordInterceptor() : null; - private final RecordInterceptor earlyRecordInterceptor = + private final @Nullable RecordInterceptor earlyRecordInterceptor = isInterceptBeforeTx() && this.transactionManager != null ? getRecordInterceptor() : null; - private final RecordInterceptor commonRecordInterceptor = getRecordInterceptor(); + private final @Nullable RecordInterceptor commonRecordInterceptor = getRecordInterceptor(); - private final BatchInterceptor batchInterceptor = + private final @Nullable BatchInterceptor batchInterceptor = !isInterceptBeforeTx() || this.transactionManager == null ? getBatchInterceptor() : null; - private final BatchInterceptor earlyBatchInterceptor = + private final @Nullable BatchInterceptor earlyBatchInterceptor = isInterceptBeforeTx() && this.transactionManager != null ? getBatchInterceptor() : null; - private final BatchInterceptor commonBatchInterceptor = getBatchInterceptor(); + private final @Nullable BatchInterceptor commonBatchInterceptor = getBatchInterceptor(); - private final ThreadStateProcessor pollThreadStateProcessor; + private final @Nullable ThreadStateProcessor pollThreadStateProcessor; private final ConsumerSeekCallback seekCallback = new InitialOrIdleSeekCallback(); private final long maxPollInterval; - private final MicrometerHolder micrometerHolder; + private final @Nullable MicrometerHolder micrometerHolder; private final boolean observationEnabled; @@ -749,20 +752,20 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final boolean subBatchPerPartition = this.containerProperties.isSubBatchPerPartition(); - private final Duration authExceptionRetryInterval = + private final @Nullable Duration authExceptionRetryInterval = this.containerProperties.getAuthExceptionRetryInterval(); private final AssignmentCommitOption autoCommitOption = this.containerProperties.getAssignmentCommitOption(); private final boolean commitCurrentOnAssignment; - private final DeliveryAttemptAware deliveryAttemptAware; + private final @Nullable DeliveryAttemptAware deliveryAttemptAware; private final EOSMode eosMode = this.containerProperties.getEosMode(); private final Map commitsDuringRebalance = new HashMap<>(); - private final String clientId; + private final @Nullable String clientId; private final boolean fixTxOffsets = this.containerProperties.isFixTxOffsets(); @@ -770,9 +773,9 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final Set pausedPartitions = new HashSet<>(); - private final Map> offsetsInThisBatch; + private final @Nullable Map> offsetsInThisBatch; - private final Map>> deferredOffsets; + private final @Nullable Map>> deferredOffsets; private final Map lastReceivePartition; @@ -793,15 +796,15 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume @Nullable private final KafkaAdmin kafkaAdmin; - private final Object bootstrapServers; + private final @Nullable Object bootstrapServers; @Nullable private final Function, Map> micrometerTagsProvider = this.containerProperties.getMicrometerTagsProvider(); - private String clusterId; + private @Nullable String clusterId; - private Map definedPartitions; + private @Nullable Map definedPartitions; private int count; @@ -821,11 +824,11 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private int nackIndex; - private Iterator batchIterator; + private @Nullable Iterator batchIterator; - private ConsumerRecords lastBatch; + private @Nullable ConsumerRecords lastBatch; - private Producer producer; + private @Nullable Producer producer; private boolean wasIdle; @@ -835,7 +838,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private boolean receivedSome; - private ConsumerRecords remainingRecords; + private @Nullable ConsumerRecords remainingRecords; private boolean pauseForPending; @@ -843,7 +846,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private volatile boolean consumerPaused; - private volatile Thread consumerThread; + private volatile @Nullable Thread consumerThread; private volatile long lastPoll = System.currentTimeMillis(); @@ -969,7 +972,10 @@ else if (listener instanceof MessageListener) { this.kafkaAdmin = obtainAdmin(); if (isListenerAdapterObservationAware()) { - ((RecordMessagingMessageListenerAdapter) this.listener).setObservationRegistry(observationRegistry); + RecordMessagingMessageListenerAdapter recordMessagingMessageListenerAdapter = (RecordMessagingMessageListenerAdapter) this.listener; + if (recordMessagingMessageListenerAdapter != null) { + recordMessagingMessageListenerAdapter.setObservationRegistry(observationRegistry); + } } } @@ -1003,7 +1009,8 @@ private KafkaAdmin obtainAdmin() { KafkaAdmin admin = applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique(); if (admin != null) { Map props = new HashMap<>(admin.getConfigurationProperties()); - if (!props.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG).equals(this.bootstrapServers)) { + Object bootstrapServer = props.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG); + if (bootstrapServer != null && !bootstrapServer.equals(this.bootstrapServers)) { props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); int opTo = admin.getOperationTimeout(); admin = new KafkaAdmin(props); @@ -1047,11 +1054,12 @@ private CommonErrorHandler determineCommonErrorHandler() { return common; } + @Nullable String getClientId() { return this.clientId; } - private String determineClientId() { + private @Nullable String determineClientId() { Map metrics = this.consumer.metrics(); Iterator metricIterator = metrics.keySet().iterator(); if (metricIterator.hasNext()) { @@ -1087,7 +1095,7 @@ private DeliveryAttemptAware setupDeliveryAttemptAware() { } } else { - if (this.commonErrorHandler.deliveryAttemptHeader()) { + if (Objects.requireNonNull(this.commonErrorHandler).deliveryAttemptHeader()) { aware = this.commonErrorHandler; } } @@ -1115,6 +1123,7 @@ private boolean determineCommitCurrent(Properties consumerProperties, Map> mergedProvider = + Function<@Nullable Object, Map> mergedProvider = cr -> this.containerProperties.getMicrometerTags(); if (this.micrometerTagsProvider != null) { mergedProvider = cr -> { @@ -1297,7 +1308,7 @@ private MicrometerHolder obtainMicrometerHolder() { } private void seekPartitions(Collection partitions, boolean idle) { - this.consumerSeekAwareListener.registerSeekCallback(this); + Objects.requireNonNull(this.consumerSeekAwareListener).registerSeekCallback(this); Map current = new HashMap<>(); for (TopicPartition topicPartition : partitions) { current.put(topicPartition, ListenerConsumer.this.consumer.position(topicPartition)); @@ -1512,7 +1523,7 @@ private void doProcessCommits() { for (ConsumerRecord kvConsumerRecord : pending) { records.add(kvConsumerRecord); } - this.commonErrorHandler.handleRemaining(cfe, records, this.consumer, + Objects.requireNonNull(this.commonErrorHandler).handleRemaining(cfe, records, this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer); } } @@ -1626,7 +1637,7 @@ private void fixTxOffsetsIfNeeded() { commitOffsets(toFix); } else { - this.transactionTemplate.executeWithoutResult(status -> { + Objects.requireNonNull(this.transactionTemplate).executeWithoutResult(status -> { doSendOffsets(getTxProducer(), toFix); }); } @@ -1657,7 +1668,7 @@ private ConsumerRecords doPoll() { } } TopicPartition next = this.batchIterator.next(); - List> subBatch = this.lastBatch.records(next); + List> subBatch = Objects.requireNonNull(this.lastBatch).records(next); records = new ConsumerRecords<>(Collections.singletonMap(next, subBatch)); if (!this.batchIterator.hasNext()) { this.batchIterator = null; @@ -1705,7 +1716,7 @@ private void beforePoll() { private synchronized void captureOffsets(ConsumerRecords records) { if (this.offsetsInThisBatch != null && records.count() > 0) { this.offsetsInThisBatch.clear(); - this.deferredOffsets.clear(); + Objects.requireNonNull(this.deferredOffsets).clear(); records.partitions().forEach(part -> { LinkedList offs = new LinkedList<>(); this.offsetsInThisBatch.put(part, offs); @@ -1844,7 +1855,7 @@ else if (this.offsetsInThisBatch != null) { } private void doResumeConsumerIfNeccessary() { - if (this.pausedForAsyncAcks && this.offsetsInThisBatch.isEmpty()) { + if (this.pausedForAsyncAcks && Objects.requireNonNull(this.offsetsInThisBatch).isEmpty()) { this.pausedForAsyncAcks = false; this.logger.debug("Resuming after manual async acks cleared"); } @@ -2097,13 +2108,13 @@ private void processAcks(ConsumerRecords records) { private synchronized void ackInOrder(ConsumerRecord cRecord) { TopicPartition part = new TopicPartition(cRecord.topic(), cRecord.partition()); - List offs = this.offsetsInThisBatch.get(part); + List offs = Objects.requireNonNull(this.offsetsInThisBatch).get(part); if (!ObjectUtils.isEmpty(offs)) { - List> deferred = this.deferredOffsets.get(part); + List> deferred = Objects.requireNonNull(this.deferredOffsets).get(part); if (offs.get(0) == cRecord.offset()) { offs.remove(0); ConsumerRecord recordToAck = cRecord; - if (!deferred.isEmpty()) { + if (!CollectionUtils.isEmpty(deferred)) { deferred.sort((a, b) -> Long.compare(a.offset(), b.offset())); while (!ObjectUtils.isEmpty(deferred) && deferred.get(0).offset() == recordToAck.offset() + 1) { recordToAck = deferred.remove(0); @@ -2121,7 +2132,9 @@ else if (cRecord.offset() < offs.get(0)) { + "; you are acknowledging a stale record: " + KafkaUtils.format(cRecord)); } else { + if (deferred != null) { deferred.add(cRecord); + } } } else { @@ -2158,7 +2171,7 @@ private void invokeBatchListener(final ConsumerRecords recordsArg) { if (records == null || records.count() == 0) { return; } - List> recordList = null; + List> recordList = new ArrayList<>(); if (!this.wantsFullRecords) { recordList = createRecordList(records); } @@ -2173,10 +2186,10 @@ private void invokeBatchListener(final ConsumerRecords recordsArg) { } private void invokeBatchListenerInTx(final ConsumerRecords records, - @Nullable final List> recordList) { + final List> recordList) { try { - this.transactionTemplate.execute(new TransactionCallbackWithoutResult() { + Objects.requireNonNull(this.transactionTemplate).execute(new TransactionCallbackWithoutResult() { @Override public void doInTransactionWithoutResult(TransactionStatus s) { @@ -2205,7 +2218,7 @@ public void doInTransactionWithoutResult(TransactionStatus s) { } private void batchRollback(final ConsumerRecords records, - @Nullable final List> recordList, RuntimeException e) { + final List> recordList, RuntimeException e) { @SuppressWarnings(UNCHECKED) AfterRollbackProcessor afterRollbackProcessorToUse = @@ -2298,7 +2311,7 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords recor } private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecords records) { - if ((!this.autoCommit && this.commonErrorHandler.isAckAfterHandle() && this.consumerGroupId != null) + if ((!this.autoCommit && Objects.requireNonNull(this.commonErrorHandler).isAckAfterHandle() && this.consumerGroupId != null) || this.producer != null) { if (this.remainingRecords != null) { ConsumerRecord firstUncommitted = this.remainingRecords.iterator().next(); @@ -2345,7 +2358,7 @@ private Object startMicrometerSample() { } private void successTimer(@Nullable Object sample, @Nullable ConsumerRecord record) { - if (sample != null) { + if (sample != null && this.micrometerHolder != null) { if (this.micrometerTagsProvider == null || record == null) { this.micrometerHolder.success(sample); } @@ -2362,11 +2375,13 @@ private void failureTimer(@Nullable Object sample, @Nullable ConsumerRecord records) throws InterruptedExc } private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords recordsArg, - @Nullable List> recordListArg) { + List> recordListArg) { ConsumerRecords records = recordsArg; List> recordList = recordListArg; @@ -2430,7 +2445,7 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords r Object sample = startMicrometerSample(); try { if (this.wantsFullRecords) { - this.batchListener.onMessage(records, // NOSONAR + Objects.requireNonNull(this.batchListener).onMessage(records, // NOSONAR this.isAnyManualAck ? new ConsumerBatchAcknowledgment(records, recordList) : null, @@ -2451,22 +2466,22 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords r } private void doInvokeBatchOnMessage(final ConsumerRecords records, - @Nullable List> recordList) { + List> recordList) { try { switch (this.listenerType) { case ACKNOWLEDGING_CONSUMER_AWARE -> - this.batchListener.onMessage(recordList, + Objects.requireNonNull(this.batchListener).onMessage(recordList, this.isAnyManualAck ? new ConsumerBatchAcknowledgment(records, recordList) : null, this.consumer); case ACKNOWLEDGING -> - this.batchListener.onMessage(recordList, + Objects.requireNonNull(this.batchListener).onMessage(recordList, this.isAnyManualAck ? new ConsumerBatchAcknowledgment(records, recordList) : null); - case CONSUMER_AWARE -> this.batchListener.onMessage(recordList, this.consumer); - case SIMPLE -> this.batchListener.onMessage(recordList); + case CONSUMER_AWARE -> Objects.requireNonNull(this.batchListener).onMessage(recordList, this.consumer); + case SIMPLE -> Objects.requireNonNull(this.batchListener).onMessage(recordList); } } catch (Exception ex) { // NOSONAR @@ -2475,9 +2490,9 @@ private void doInvokeBatchOnMessage(final ConsumerRecords records, } private void invokeBatchErrorHandler(final ConsumerRecords records, - @Nullable List> list, RuntimeException rte) { + List> list, RuntimeException rte) { - if (this.commonErrorHandler.seeksAfterHandling() || this.transactionManager != null + if (Objects.requireNonNull(this.commonErrorHandler).seeksAfterHandling() || this.transactionManager != null || rte instanceof CommitFailedException) { this.commonErrorHandler.handleBatch(rte, records, this.consumer, @@ -2547,7 +2562,7 @@ private void invokeRecordListenerInTx(final ConsumerRecords records) { } private void invokeInTransaction(Iterator> iterator, final ConsumerRecord cRecord) { - this.transactionTemplate.execute(new TransactionCallbackWithoutResult() { + Objects.requireNonNull(this.transactionTemplate).execute(new TransactionCallbackWithoutResult() { @Override public void doInTransactionWithoutResult(TransactionStatus s) { @@ -2746,8 +2761,8 @@ private void pauseForNackSleep() { @SuppressWarnings(RAWTYPES) private Producer getTxProducer() { - return ((KafkaResourceHolder) TransactionSynchronizationManager - .getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory())) + return ((KafkaResourceHolder) Objects.requireNonNull(TransactionSynchronizationManager + .getResource(Objects.requireNonNull(ListenerConsumer.this.kafkaTxManager).getProducerFactory()))) .getProducer(); // NOSONAR } @@ -2818,7 +2833,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord cReco } private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecord cRecord) { - if ((!this.autoCommit && this.commonErrorHandler.isAckAfterHandle() && this.consumerGroupId != null) + if ((!this.autoCommit && Objects.requireNonNull(this.commonErrorHandler).isAckAfterHandle() && this.consumerGroupId != null) || this.producer != null) { if (this.remainingRecords == null || !cRecord.equals(this.remainingRecords.iterator().next())) { @@ -2884,17 +2899,17 @@ private void doInvokeOnMessage(final ConsumerRecord recordArg) { try { switch (this.listenerType) { case ACKNOWLEDGING_CONSUMER_AWARE -> - this.listener.onMessage(cRecord, + Objects.requireNonNull(this.listener).onMessage(cRecord, this.isAnyManualAck ? new ConsumerAcknowledgment(cRecord) : null, this.consumer); - case CONSUMER_AWARE -> this.listener.onMessage(cRecord, this.consumer); + case CONSUMER_AWARE -> Objects.requireNonNull(this.listener).onMessage(cRecord, this.consumer); case ACKNOWLEDGING -> - this.listener.onMessage(cRecord, + Objects.requireNonNull(this.listener).onMessage(cRecord, this.isAnyManualAck ? new ConsumerAcknowledgment(cRecord) : null); - case SIMPLE -> this.listener.onMessage(cRecord); + case SIMPLE -> Objects.requireNonNull(this.listener).onMessage(cRecord); } } catch (Exception ex) { // NOSONAR @@ -2906,7 +2921,7 @@ private void doInvokeOnMessage(final ConsumerRecord recordArg) { private void invokeErrorHandlerBySingleRecord(FailedRecordTuple failedRecordTuple) { final ConsumerRecord cRecord = failedRecordTuple.record; RuntimeException rte = failedRecordTuple.ex; - if (this.commonErrorHandler.seeksAfterHandling() || rte instanceof CommitFailedException) { + if (Objects.requireNonNull(this.commonErrorHandler).seeksAfterHandling() || rte instanceof CommitFailedException) { try { if (this.producer == null) { processCommits(); @@ -2944,7 +2959,7 @@ private void invokeErrorHandlerBySingleRecord(FailedRecordTuple failedReco private void invokeErrorHandler(final ConsumerRecord cRecord, Iterator> iterator, RuntimeException rte) { - if (this.commonErrorHandler.seeksAfterHandling() || rte instanceof CommitFailedException) { + if (Objects.requireNonNull(this.commonErrorHandler).seeksAfterHandling() || rte instanceof CommitFailedException) { try { if (this.producer == null) { processCommits(); @@ -2990,7 +3005,8 @@ private void invokeErrorHandler(final ConsumerRecord cRecord, private RuntimeException decorateException(Exception ex) { Exception toHandle = ex; if (toHandle instanceof ListenerExecutionFailedException) { - toHandle = new ListenerExecutionFailedException(toHandle.getMessage(), this.consumerGroupId, + String message = toHandle.getMessage() == null ? "Error occurred" : toHandle.getMessage(); + toHandle = new ListenerExecutionFailedException(message, this.consumerGroupId, toHandle.getCause()); // NOSONAR restored below fixStackTrace(ex, toHandle); } @@ -3073,11 +3089,11 @@ private void sendOffsetsToTransaction() { doSendOffsets(this.producer, commits); } - private void doSendOffsets(Producer prod, Map commits) { + private void doSendOffsets(@Nullable Producer prod, Map commits) { if (CollectionUtils.isEmpty(commits)) { return; } - prod.sendOffsetsToTransaction(commits, this.consumer.groupMetadata()); + Objects.requireNonNull(prod).sendOffsetsToTransaction(commits, this.consumer.groupMetadata()); if (this.fixTxOffsets) { this.lastCommits.putAll(commits); } @@ -3153,12 +3169,15 @@ private void processSeeks() { Function offsetComputeFunction = offset.getOffsetComputeFunction(); if (position == null) { if (offset.isRelativeToCurrent()) { - whereTo += this.consumer.position(topicPartition); + long topicPartitionPosition = this.consumer.position(topicPartition); + Assert.state(whereTo != null, "Current offset must not be null"); + whereTo += topicPartitionPosition; whereTo = Math.max(whereTo, 0); } else if (offsetComputeFunction != null) { whereTo = offsetComputeFunction.apply(this.consumer.position(topicPartition)); } + Assert.state(whereTo != null, "offset to seek cannot be null"); this.consumer.seek(topicPartition, whereTo); } else if (SeekPosition.TIMESTAMP.equals(position)) { @@ -3260,7 +3279,7 @@ private void initPartitionsIfNeeded() { } doInitialSeeks(partitions, beginnings, ends); if (this.consumerSeekAwareListener != null) { - this.consumerSeekAwareListener.onPartitionsAssigned(this.definedPartitions.keySet().stream() + this.consumerSeekAwareListener.onPartitionsAssigned(Objects.requireNonNull(this.definedPartitions).keySet().stream() .map(tp -> new SimpleEntry<>(tp, this.consumer.position(tp))) .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)), this.seekCallback); @@ -3475,7 +3494,7 @@ public void seekToTimestamp(Collection topicParts, long timestam } @Override - public String getGroupId() { + public @Nullable String getGroupId() { return this.consumerGroupId; } @@ -3541,7 +3560,7 @@ private final class ConsumerBatchAcknowledgment implements Acknowledgment { private final ConsumerRecords records; - private final List> recordList; + private final @Nullable List> recordList; private volatile boolean acked; @@ -3566,7 +3585,7 @@ public void acknowledge() { for (TopicPartition topicPartition : this.records.partitions()) { if (offs != null) { offs.remove(topicPartition); - deferred.remove(topicPartition); + Objects.requireNonNull(deferred).remove(topicPartition); } } processAcks(this.records); @@ -3642,10 +3661,10 @@ public String toString() { private class ListenerConsumerRebalanceListener implements ConsumerRebalanceListener { - private final ConsumerRebalanceListener userListener = getContainerProperties() + private final @Nullable ConsumerRebalanceListener userListener = getContainerProperties() .getConsumerRebalanceListener(); - private final ConsumerAwareRebalanceListener consumerAwareListener = + private final @Nullable ConsumerAwareRebalanceListener consumerAwareListener = this.userListener instanceof ConsumerAwareRebalanceListener carl ? carl : null; private final Collection revoked = new LinkedList<>(); @@ -3662,7 +3681,7 @@ public void onPartitionsRevoked(Collection partitions) { partitions); } else { - this.userListener.onPartitionsRevoked(partitions); + Objects.requireNonNull(this.userListener).onPartitionsRevoked(partitions); } try { // Wait until now to commit, in case the user listener added acks @@ -3691,7 +3710,7 @@ public void onPartitionsRevoked(Collection partitions) { if (pendingOffsets != null) { partitions.forEach(tp -> { pendingOffsets.remove(tp); - ListenerConsumer.this.deferredOffsets.remove(tp); + Objects.requireNonNull(ListenerConsumer.this.deferredOffsets).remove(tp); }); if (pendingOffsets.isEmpty()) { ListenerConsumer.this.consumerPaused = false; @@ -3732,7 +3751,7 @@ public void onPartitionsAssigned(Collection partitions) { this.consumerAwareListener.onPartitionsAssigned(ListenerConsumer.this.consumer, partitions); } else { - this.userListener.onPartitionsAssigned(partitions); + Objects.requireNonNull(this.userListener).onPartitionsAssigned(partitions); } if (!ListenerConsumer.this.firstPoll && ListenerConsumer.this.definedPartitions == null && ListenerConsumer.this.consumerSeekAwareListener != null) { @@ -3858,7 +3877,7 @@ public void onPartitionsLost(Collection partitions) { this.consumerAwareListener.onPartitionsLost(ListenerConsumer.this.consumer, partitions); } else { - this.userListener.onPartitionsLost(partitions); + Objects.requireNonNull(this.userListener).onPartitionsLost(partitions); } onPartitionsRevoked(partitions); } @@ -3995,12 +4014,12 @@ private Long computeBackwardWhereTo(long offset, boolean toCurrent, TopicPartiti * @param relativeToCurrent relative to current. * @param seekPosition seek position strategy. */ - private record OffsetMetadata(Long offset, boolean relativeToCurrent, SeekPosition seekPosition) { + private record OffsetMetadata(@Nullable Long offset, boolean relativeToCurrent, @Nullable SeekPosition seekPosition) { } private class StopCallback implements BiConsumer { - private final Runnable callback; + private final @Nullable Runnable callback; StopCallback(@Nullable Runnable callback) { this.callback = callback; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerContainerPauseService.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerContainerPauseService.java index ccab45bdee..c2c0d7deb8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerContainerPauseService.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerContainerPauseService.java @@ -19,6 +19,7 @@ import java.time.Duration; import java.time.Instant; import java.time.ZoneId; +import java.util.Objects; import java.util.Optional; import org.apache.commons.logging.LogFactory; @@ -140,7 +141,7 @@ public void resume(MessageListenerContainer messageListenerContainer) { * Callers must ensure this.registry is not null before calling. */ private Optional getListenerContainer(String listenerId) { - MessageListenerContainer messageListenerContainer = this.registry.getListenerContainer(listenerId); // NOSONAR + MessageListenerContainer messageListenerContainer = Objects.requireNonNull(this.registry).getListenerContainer(listenerId); // NOSONAR if (messageListenerContainer == null) { LOGGER.warn(() -> "MessageListenerContainer " + listenerId + " does not exists"); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerExecutionFailedException.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerExecutionFailedException.java index 57576dd3f5..ba033dbb1c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerExecutionFailedException.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerExecutionFailedException.java @@ -28,7 +28,7 @@ @SuppressWarnings("serial") public class ListenerExecutionFailedException extends KafkaException { - private final String groupId; + private final @Nullable String groupId; /** * Construct an instance with the provided properties. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java index 4b2d1e7b79..aa7dbcb6ee 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2024 the original author or authors. + * Copyright 2017-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,9 +17,11 @@ package org.springframework.kafka.listener; import java.util.Map; +import java.util.Objects; import java.util.function.Supplier; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.jspecify.annotations.Nullable; import org.springframework.util.Assert; import org.springframework.util.backoff.BackOff; @@ -124,7 +126,7 @@ public static void unrecoverableBackOff(BackOff backOff, ThreadLocal executions, - Map lastIntervals, MessageListenerContainer container) throws InterruptedException { + Map lastIntervals, @Nullable MessageListenerContainer container) throws InterruptedException { Thread currentThread = Thread.currentThread(); Long interval = nextBackOff(backOff, executions); @@ -158,8 +160,8 @@ static long nextBackOff(BackOff backOff, Map execution * @throws InterruptedException if the thread is interrupted. * @since 2.7 */ - public static void stoppableSleep(MessageListenerContainer container, long interval) throws InterruptedException { - conditionalSleep(container::isRunning, interval); + public static void stoppableSleep(@Nullable MessageListenerContainer container, long interval) throws InterruptedException { + conditionalSleep(Objects.requireNonNull(container)::isRunning, interval); } /** @@ -188,8 +190,9 @@ public static void conditionalSleep(Supplier shouldSleepCondition, long * @return an offset and metadata. * @since 2.8.6 */ - public static OffsetAndMetadata createOffsetAndMetadata(MessageListenerContainer container, + public static OffsetAndMetadata createOffsetAndMetadata(@Nullable MessageListenerContainer container, long offset) { + Assert.state(container != null, "Container cannot be null"); final OffsetAndMetadataProvider metadataProvider = container.getContainerProperties() .getOffsetAndMetadataProvider(); if (metadataProvider != null) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ManualAckListenerErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ManualAckListenerErrorHandler.java index 1cede72095..702566fc9e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ManualAckListenerErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ManualAckListenerErrorHandler.java @@ -38,7 +38,7 @@ default Object handleError(Message message, ListenerExecutionFailedException } @Override - Object handleError(Message message, ListenerExecutionFailedException exception, Consumer consumer, + Object handleError(Message message, ListenerExecutionFailedException exception, @Nullable Consumer consumer, @Nullable Acknowledgment ack); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/RecoveryStrategy.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/RecoveryStrategy.java index cf8378572b..92e428a5d3 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/RecoveryStrategy.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/RecoveryStrategy.java @@ -39,7 +39,7 @@ public interface RecoveryStrategy { * @return true to skip. * @throws InterruptedException if the thread is interrupted. */ - boolean recovered(ConsumerRecord record, Exception ex, @Nullable MessageListenerContainer container, + boolean recovered(ConsumerRecord record, @Nullable Exception ex, @Nullable MessageListenerContainer container, @Nullable Consumer consumer) throws InterruptedException; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/RetryListener.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/RetryListener.java index 9c8bc80c06..4a84fc03e4 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/RetryListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/RetryListener.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2022 the original author or authors. + * Copyright 2021-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.jspecify.annotations.Nullable; /** * A listener for retry activity. @@ -35,14 +36,14 @@ public interface RetryListener { * @param ex the exception. * @param deliveryAttempt the delivery attempt. */ - void failedDelivery(ConsumerRecord record, Exception ex, int deliveryAttempt); + void failedDelivery(ConsumerRecord record, @Nullable Exception ex, int deliveryAttempt); /** * Called after a failing record was successfully recovered. * @param record the record. * @param ex the exception. */ - default void recovered(ConsumerRecord record, Exception ex) { + default void recovered(ConsumerRecord record, @Nullable Exception ex) { } /** @@ -51,7 +52,7 @@ default void recovered(ConsumerRecord record, Exception ex) { * @param original the original exception causing the recovery attempt. * @param failure the exception thrown by the recoverer. */ - default void recoveryFailed(ConsumerRecord record, Exception original, Exception failure) { + default void recoveryFailed(ConsumerRecord record, @Nullable Exception original, Exception failure) { } /** diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java index 9169c1dae8..50d6d03b01 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java @@ -77,7 +77,7 @@ private SeekUtils() { * @param logger a {@link LogAccessor} for seek errors. * @return true if the failed record was skipped. */ - public static boolean doSeeks(List> records, Consumer consumer, Exception exception, + public static boolean doSeeks(List> records, Consumer consumer, @Nullable Exception exception, boolean recoverable, BiPredicate, Exception> skipper, LogAccessor logger) { return doSeeks(records, consumer, exception, recoverable, (rec, ex, cont, cons) -> skipper.test(rec, ex), null, @@ -95,7 +95,7 @@ public static boolean doSeeks(List> records, Consumer * @param logger a {@link LogAccessor} for seek errors. * @return true if the failed record was skipped. */ - public static boolean doSeeks(List> records, Consumer consumer, Exception exception, + public static boolean doSeeks(List> records, Consumer consumer, @Nullable Exception exception, boolean recoverable, RecoveryStrategy recovery, @Nullable MessageListenerContainer container, LogAccessor logger) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AbstractDelegatingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AbstractDelegatingMessageListenerAdapter.java index e393409b27..b6267b3171 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AbstractDelegatingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AbstractDelegatingMessageListenerAdapter.java @@ -80,7 +80,7 @@ public void onPartitionsAssigned(Map assignments, Consumer } @Override - public void onPartitionsRevoked(Collection partitions) { + public void onPartitionsRevoked(@Nullable Collection partitions) { if (this.seekAware != null) { this.seekAware.onPartitionsRevoked(partitions); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java index fe70508211..abc65aef96 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java @@ -139,7 +139,7 @@ public void onMessage(ConsumerRecords records, @Nullable Acknowledgment ac */ @Override public void onMessage(List> records, @Nullable Acknowledgment acknowledgment, - Consumer consumer) { + @Nullable Consumer consumer) { Message message; if (!isConsumerRecordList()) { @@ -170,7 +170,7 @@ public void onMessage(List> records, @Nullable Acknowledgme @SuppressWarnings({ "unchecked", "rawtypes" }) protected Message toMessagingMessage(List records, @Nullable Acknowledgment acknowledgment, - Consumer consumer) { + @Nullable Consumer consumer) { return getBatchMessageConverter().toMessage(records, acknowledgment, consumer, getType()); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchToRecordAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchToRecordAdapter.java index 9bea4598f5..eda41ee83b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchToRecordAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchToRecordAdapter.java @@ -49,7 +49,7 @@ public interface BatchToRecordAdapter { * @param callback the callback. */ void adapt(List> messages, List> records, @Nullable Acknowledgment ack, - Consumer consumer, Callback callback); + @Nullable Consumer consumer, Callback callback); /** * A callback for each message. @@ -67,7 +67,7 @@ interface Callback { * @param consumer the consumer. * @param message the message. */ - void invoke(ConsumerRecord record, @Nullable Acknowledgment ack, Consumer consumer, + void invoke(ConsumerRecord record, @Nullable Acknowledgment ack, @Nullable Consumer consumer, Message message); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ConvertingMessageListener.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ConvertingMessageListener.java index 2e7d4cd6bb..8d1e63a72c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ConvertingMessageListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ConvertingMessageListener.java @@ -107,7 +107,7 @@ public MessageListener getDelegate() { @Override @SuppressWarnings("unchecked") - public void onMessage(ConsumerRecord receivedRecord, @Nullable Acknowledgment acknowledgment, Consumer consumer) { + public void onMessage(ConsumerRecord receivedRecord, @Nullable Acknowledgment acknowledgment, @Nullable Consumer consumer) { ConsumerRecord convertedConsumerRecord = convertConsumerRecord(receivedRecord); if (this.delegate instanceof AcknowledgingConsumerAwareMessageListener) { this.delegate.onMessage(convertedConsumerRecord, acknowledgment, consumer); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DefaultBatchToRecordAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DefaultBatchToRecordAdapter.java index 0f0cfc9c8d..eb24849f5f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DefaultBatchToRecordAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DefaultBatchToRecordAdapter.java @@ -64,7 +64,7 @@ public DefaultBatchToRecordAdapter(ConsumerRecordRecoverer recoverer) { @Override public void adapt(List> messages, List> records, @Nullable Acknowledgment ack, - Consumer consumer, Callback callback) { + @Nullable Consumer consumer, Callback callback) { for (int i = 0; i < messages.size(); i++) { Message message = messages.get(i); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 0b038b5614..fa4d5f1753 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -392,7 +392,7 @@ public void onPartitionsAssigned(Map assignments, Consumer } @Override - public void onPartitionsRevoked(Collection partitions) { + public void onPartitionsRevoked(@Nullable Collection partitions) { if (this.bean instanceof ConsumerSeekAware csa) { csa.onPartitionsRevoked(partitions); } @@ -406,12 +406,12 @@ public void onIdleContainer(Map assignments, ConsumerSeekC } protected Message toMessagingMessage(ConsumerRecord cRecord, @Nullable Acknowledgment acknowledgment, - Consumer consumer) { + @Nullable Consumer consumer) { return getMessageConverter().toMessage(cRecord, acknowledgment, consumer, getType()); } - protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, Consumer consumer, + protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, @Nullable Consumer consumer, final Message message) { Throwable listenerError = null; @@ -455,7 +455,7 @@ private Observation getCurrentObservation() { */ @Nullable protected final Object invokeHandler(Object data, @Nullable Acknowledgment acknowledgment, Message message, - Consumer consumer) { + @Nullable Consumer consumer) { Acknowledgment ack = acknowledgment; if (ack == null && this.noOpAck) { @@ -511,7 +511,7 @@ private RuntimeException checkAckArg(@Nullable Acknowledgment acknowledgment, Me * {@code o.s.messaging.Message}; may be null */ protected void handleResult(Object resultArg, Object request, @Nullable Acknowledgment acknowledgment, - Consumer consumer, @Nullable Message source) { + @Nullable Consumer consumer, @Nullable Message source) { final Observation observation = getCurrentObservation(); this.logger.debug(() -> "Listener method returned result [" + resultArg + "] - generating response message for it"); @@ -724,7 +724,7 @@ protected void acknowledge(@Nullable Acknowledgment acknowledgment) { } @SuppressWarnings("NullAway") // Dataflow analysis limitation - protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgment, Consumer consumer, + protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgment, @Nullable Consumer consumer, @Nullable Throwable t, @Nullable Message source) { try { @@ -749,7 +749,7 @@ private static boolean canAsyncRetry(Object request, Throwable exception) { return request instanceof ConsumerRecord && exception instanceof RuntimeException; } - protected void handleException(Object records, @Nullable Acknowledgment acknowledgment, Consumer consumer, + protected void handleException(Object records, @Nullable Acknowledgment acknowledgment, @Nullable Consumer consumer, @Nullable Message message, ListenerExecutionFailedException e) { if (this.errorHandler != null) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java index 9fbf4af735..4427f93d16 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java @@ -70,7 +70,7 @@ public RecordMessagingMessageListenerAdapter(Object bean, Method method, */ @Override public void onMessage(ConsumerRecord record, @Nullable Acknowledgment acknowledgment, - Consumer consumer) { + @Nullable Consumer consumer) { Message message; if (isConversionNeeded()) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/package-info.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/package-info.java index cf635f74b6..4c2b4c5e68 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/package-info.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/package-info.java @@ -1,5 +1,5 @@ /** * Package for kafka listeners */ -@org.springframework.lang.NonNullApi +@org.jspecify.annotations.NullMarked package org.springframework.kafka.listener; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java index 69d6a39301..c9ee6a4f7d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2023 the original author or authors. + * Copyright 2019-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,6 +34,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Header; +import org.jspecify.annotations.Nullable; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.BatchConsumerAwareMessageListener; @@ -125,7 +126,7 @@ public synchronized void setReturnPartialOnTimeout(boolean returnPartialOnTimeou } @Override - public void onMessage(List>>> data, Consumer consumer) { + public void onMessage(List>>> data, @Nullable Consumer consumer) { List>>> completed = new ArrayList<>(); String correlationHeaderName = getCorrelationHeaderName(); data.forEach(record -> { @@ -191,11 +192,11 @@ protected boolean handleTimeout(Object correlationId, } } - private void checkOffsetsAndCommitIfNecessary(List> list, Consumer consumer) { + private void checkOffsetsAndCommitIfNecessary(List> list, @Nullable Consumer consumer) { list.forEach(record -> this.offsets.compute( new TopicPartition(record.topic(), record.partition()), (k, v) -> v == null ? record.offset() + 1 : Math.max(v, record.offset() + 1))); - if (this.pending.isEmpty() && !this.offsets.isEmpty()) { + if (this.pending.isEmpty() && !this.offsets.isEmpty() && consumer != null) { consumer.commitSync(this.offsets.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, entry -> new OffsetAndMetadata(entry.getValue()))), diff --git a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java index 197ab114c9..dd272ebaff 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java @@ -219,7 +219,7 @@ public void setAutoStartup(boolean autoStartup) { * Return the topics/partitions assigned to the replying listener container. * @return the topics/partitions. */ - public Collection getAssignedReplyTopicPartitions() { + public @Nullable Collection getAssignedReplyTopicPartitions() { return this.replyContainer.getAssignedPartitions(); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessageConverter.java index 1f5094e8f0..dddd50daad 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessageConverter.java @@ -46,7 +46,7 @@ public interface BatchMessageConverter extends MessageConverter { */ @NonNull Message toMessage(List> records, @Nullable Acknowledgment acknowledgment, - Consumer consumer, Type payloadType); + @Nullable Consumer consumer, Type payloadType); /** * Convert a message to a producer record. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java index b7db7afadb..c284b2581c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java @@ -148,7 +148,7 @@ public void setRawRecordHeader(boolean rawRecordHeader) { @Override // NOSONAR public Message toMessage(List> records, @Nullable Acknowledgment acknowledgment, - Consumer consumer, Type type) { + @Nullable Consumer consumer, Type type) { KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId, this.generateTimestamp); diff --git a/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinTests.kt b/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinTests.kt index f8604cdeae..07248440ba 100644 --- a/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinTests.kt +++ b/spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinTests.kt @@ -203,12 +203,12 @@ class EnableKafkaKotlinTests { ConcurrentMessageListenerContainer { val container = kafkaListenerContainerFactory.createContainer("kotlinTestTopic2") - container.containerProperties.groupId = "checkedEx" - container.containerProperties.messageListener = MessageListener { + container.containerProperties.setGroupId("checkedEx") + container.containerProperties.setMessageListener(MessageListener { if (it.value() == "fail") { throw Exception("checked") } - } + }) return container; } @@ -218,12 +218,12 @@ class EnableKafkaKotlinTests { ConcurrentMessageListenerContainer { val container = kafkaBatchListenerContainerFactory.createContainer("kotlinBatchTestTopic2") - container.containerProperties.groupId = "batchCheckedEx" - container.containerProperties.messageListener = BatchMessageListener { + container.containerProperties.setGroupId("batchCheckedEx") + container.containerProperties.setMessageListener(BatchMessageListener { if (it.first().value() == "fail") { throw Exception("checked") } - } + }) return container; }