Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2024 the original author or authors.
* Copyright 2022-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -60,8 +60,8 @@ private fun createContainer(
factory: ConcurrentKafkaListenerContainerFactory<String, String>, topic: String, group: String
): ConcurrentMessageListenerContainer<String, String> {
val container = factory.createContainer(topic)
container.containerProperties.messageListener = MyListener()
container.containerProperties.groupId = group
container.containerProperties.setMessageListener(MyListener())
container.containerProperties.setGroupId(group)
container.beanName = group
container.start()
return container
Expand Down Expand Up @@ -104,9 +104,10 @@ fun pojo(id: String, topic: String): MyPojo {

// tag::listener[]

class MyListener : MessageListener<String?, String?> {
class MyListener : MessageListener<String, String> {

override fun onMessage(data: ConsumerRecord<String, String>) {

override fun onMessage(data: ConsumerRecord<String?, String?>) {
// ...
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class Application {
factory: ConcurrentKafkaListenerContainerFactory<String, String>
): ReplyingKafkaTemplate<String, String, String> {
val replyContainer = factory.createContainer("replies")
replyContainer.containerProperties.groupId = "request.replies"
replyContainer.containerProperties.setGroupId("request.replies")
val template = ReplyingKafkaTemplate<String, String, String>(pf, replyContainer)
template.messageConverter = ByteArrayJsonMessageConverter()
template.setDefaultTopic("requests")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ interface ProducerCallback<K, V, T> {
*/
interface OperationsCallback<K, V, T> {

@Nullable
T doInOperations(KafkaOperations<K, V> operations);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ public <T> T execute(ProducerCallback<K, V, T> callback) {
}

@Override
public <T> T executeInTransaction(OperationsCallback<K, V, T> callback) {
public <T> @Nullable T executeInTransaction(OperationsCallback<K, V, T> callback) {
Assert.notNull(callback, "'callback' cannot be null");
Assert.state(this.transactional, "Producer factory does not support transactions");
Thread currentThread = Thread.currentThread();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.jspecify.annotations.Nullable;

/**
* An event published when a consumer is stopped. While it is best practice to use
Expand All @@ -37,7 +38,7 @@ public class ConsumerStoppingEvent extends KafkaEvent {

private transient final Consumer<?, ?> consumer;

private transient final Collection<TopicPartition> partitions;
private transient final @Nullable Collection<TopicPartition> partitions;

/**
* Construct an instance with the provided source, consumer and partitions.
Expand All @@ -48,7 +49,7 @@ public class ConsumerStoppingEvent extends KafkaEvent {
* @since 2.2.1
*/
public ConsumerStoppingEvent(Object source, Object container,
Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
Consumer<?, ?> consumer, @Nullable Collection<TopicPartition> partitions) {
super(source, container);
this.consumer = consumer;
this.partitions = partitions;
Expand All @@ -58,7 +59,7 @@ public ConsumerStoppingEvent(Object source, Object container,
return this.consumer;
}

public Collection<TopicPartition> getPartitions() {
public @Nullable Collection<TopicPartition> getPartitions() {
return this.partitions;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,21 +65,23 @@ public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, Consumer
}

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
partitions.forEach(tp -> {
List<ConsumerSeekCallback> removedCallbacks = this.topicToCallbacks.remove(tp);
if (removedCallbacks != null && !removedCallbacks.isEmpty()) {
removedCallbacks.forEach(cb -> {
List<TopicPartition> topics = this.callbackToTopics.get(cb);
if (topics != null) {
topics.remove(tp);
if (topics.isEmpty()) {
this.callbackToTopics.remove(cb);
public void onPartitionsRevoked(@Nullable Collection<TopicPartition> partitions) {
if (partitions != null) {
partitions.forEach(tp -> {
List<ConsumerSeekCallback> removedCallbacks = this.topicToCallbacks.remove(tp);
if (removedCallbacks != null && !removedCallbacks.isEmpty()) {
removedCallbacks.forEach(cb -> {
List<TopicPartition> topics = this.callbackToTopics.get(cb);
if (topics != null) {
topics.remove(tp);
if (topics.isEmpty()) {
this.callbackToTopics.remove(cb);
}
}
}
});
}
});
});
}
});
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 the original author or authors.
* Copyright 2018-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,10 @@

package org.springframework.kafka.listener;

import java.util.Objects;

import org.jspecify.annotations.Nullable;

import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.kafka.config.KafkaListenerConfigUtils;
Expand All @@ -32,9 +36,9 @@
public abstract class AbstractKafkaBackOffManagerFactory
implements KafkaBackOffManagerFactory, ApplicationContextAware {

private ApplicationContext applicationContext;
private @Nullable ApplicationContext applicationContext;

private ListenerContainerRegistry listenerContainerRegistry;
private @Nullable ListenerContainerRegistry listenerContainerRegistry;

/**
* Creates an instance that will retrieve the {@link ListenerContainerRegistry} from
Expand Down Expand Up @@ -83,7 +87,7 @@ private ListenerContainerRegistry getListenerContainerFromContext() {
}

protected <T> T getBean(String beanName, Class<T> beanClass) {
return this.applicationContext.getBean(beanName, beanClass);
return Objects.requireNonNull(this.applicationContext).getBean(beanName, beanClass);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ public abstract class AbstractMessageListenerContainer<K, V>
@NonNull
private String beanName = "noBeanNameSet";

private ApplicationEventPublisher applicationEventPublisher;
private @Nullable ApplicationEventPublisher applicationEventPublisher;

private CommonErrorHandler commonErrorHandler;
private @Nullable CommonErrorHandler commonErrorHandler;

private boolean autoStartup = true;

Expand All @@ -114,15 +114,16 @@ public abstract class AbstractMessageListenerContainer<K, V>

private int topicCheckTimeout = DEFAULT_TOPIC_CHECK_TIMEOUT;

private RecordInterceptor<K, V> recordInterceptor;
private @Nullable RecordInterceptor<K, V> recordInterceptor;

private BatchInterceptor<K, V> batchInterceptor;
private @Nullable BatchInterceptor<K, V> batchInterceptor;

private boolean interceptBeforeTx = true;

@SuppressWarnings("NullAway.Init")
private byte[] listenerInfo;

private ApplicationContext applicationContext;
private @Nullable ApplicationContext applicationContext;

private volatile boolean running = false;

Expand All @@ -149,13 +150,13 @@ public abstract class AbstractMessageListenerContainer<K, V>
* @param containerProperties the properties.
*/
@SuppressWarnings("unchecked")
protected AbstractMessageListenerContainer(ConsumerFactory<? super K, ? super V> consumerFactory,
protected AbstractMessageListenerContainer(@Nullable ConsumerFactory<? super K, ? super V> consumerFactory,
ContainerProperties containerProperties) {

Assert.notNull(containerProperties, "'containerProperties' cannot be null");
Assert.notNull(consumerFactory, "'consumerFactory' cannot be null");
this.consumerFactory = (ConsumerFactory<K, V>) consumerFactory;
String[] topics = containerProperties.getTopics();
@Nullable String @Nullable [] topics = containerProperties.getTopics();
if (topics != null) {
this.containerProperties = new ContainerProperties(topics);
}
Expand All @@ -165,7 +166,7 @@ protected AbstractMessageListenerContainer(ConsumerFactory<? super K, ? super V>
this.containerProperties = new ContainerProperties(topicPattern);
}
else {
TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();
@Nullable TopicPartitionOffset @Nullable [] topicPartitions = containerProperties.getTopicPartitions();
if (topicPartitions != null) {
this.containerProperties = new ContainerProperties(topicPartitions);
}
Expand Down Expand Up @@ -370,8 +371,8 @@ public String getMainListenerId() {
return this.mainListenerId;
}

@Nullable
@Override
@SuppressWarnings("NullAway") // Dataflow analysis limitation
public byte[] getListenerInfo() {
return this.listenerInfo != null ? Arrays.copyOf(this.listenerInfo, this.listenerInfo.length) : null;
}
Expand All @@ -382,6 +383,7 @@ public byte[] getListenerInfo() {
* @param listenerInfo the info.
* @since 2.8.4
*/
@SuppressWarnings("NullAway") // Dataflow analysis limitation
public void setListenerInfo(@Nullable byte[] listenerInfo) {
this.listenerInfo = listenerInfo != null ? Arrays.copyOf(listenerInfo, listenerInfo.length) : null;
}
Expand Down Expand Up @@ -458,7 +460,7 @@ public void setKafkaAdmin(KafkaAdmin kafkaAdmin) {
this.kafkaAdmin = kafkaAdmin;
}

protected RecordInterceptor<K, V> getRecordInterceptor() {
protected @Nullable RecordInterceptor<K, V> getRecordInterceptor() {
return this.recordInterceptor;
}

Expand All @@ -469,11 +471,11 @@ protected RecordInterceptor<K, V> getRecordInterceptor() {
* @since 2.2.7
* @see #setInterceptBeforeTx(boolean)
*/
public void setRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
public void setRecordInterceptor(@Nullable RecordInterceptor<K, V> recordInterceptor) {
this.recordInterceptor = recordInterceptor;
}

protected BatchInterceptor<K, V> getBatchInterceptor() {
protected @Nullable BatchInterceptor<K, V> getBatchInterceptor() {
return this.batchInterceptor;
}

Expand All @@ -483,7 +485,7 @@ protected BatchInterceptor<K, V> getBatchInterceptor() {
* @since 2.6.6
* @see #setInterceptBeforeTx(boolean)
*/
public void setBatchInterceptor(BatchInterceptor<K, V> batchInterceptor) {
public void setBatchInterceptor(@Nullable BatchInterceptor<K, V> batchInterceptor) {
this.batchInterceptor = batchInterceptor;
}

Expand Down Expand Up @@ -541,7 +543,7 @@ protected void checkTopics() {
List<String> missing = null;
try (AdminClient client = AdminClient.create(configs)) { // NOSONAR - false positive null check
if (client != null) {
String[] topics = this.containerProperties.getTopics();
@Nullable String @Nullable[] topics = this.containerProperties.getTopics();
if (topics == null) {
topics = Arrays.stream(this.containerProperties.getTopicPartitions())
.map(TopicPartitionOffset::getTopic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,6 @@ default void onMessage(ConsumerRecord<K, V> data) {
}

@Override
void onMessage(ConsumerRecord<K, V> data, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer);
void onMessage(ConsumerRecord<K, V> data, @Nullable Acknowledgment acknowledgment, @Nullable Consumer<?, ?> consumer);

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public interface BackOffHandler {
* @param exception the exception.
* @param nextBackOff the next back off.
*/
default void onNextBackOff(@Nullable MessageListenerContainer container, Exception exception, long nextBackOff) {
default void onNextBackOff(@Nullable MessageListenerContainer container, @Nullable Exception exception, long nextBackOff) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,6 @@ default void onMessage(List<ConsumerRecord<K, V>> data) {
}

@Override
void onMessage(List<ConsumerRecord<K, V>> data, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer);
void onMessage(List<ConsumerRecord<K, V>> data, @Nullable Acknowledgment acknowledgment, @Nullable Consumer<?, ?> consumer);

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 the original author or authors.
* Copyright 2015-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@
import java.util.List;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.jspecify.annotations.Nullable;

import org.springframework.kafka.support.Acknowledgment;

Expand Down Expand Up @@ -49,6 +50,6 @@ default void onMessage(List<ConsumerRecord<K, V>> data) {
}

@Override
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
void onMessage(List<ConsumerRecord<K, V>> data, @Nullable Acknowledgment acknowledgment);

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2019 the original author or authors.
* Copyright 2017-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.jspecify.annotations.Nullable;

/**
* Listener for handling a batch of incoming Kafka messages; the list
Expand Down Expand Up @@ -47,6 +48,6 @@ default void onMessage(List<ConsumerRecord<K, V>> data) {
}

@Override
void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
void onMessage(List<ConsumerRecord<K, V>> data, @Nullable Consumer<?, ?> consumer);

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class BatchListenerFailedException extends KafkaException {

private final int index;

private transient ConsumerRecord<?, ?> record;
private transient @Nullable ConsumerRecord<?, ?> record;

/**
* Construct an instance with the provided properties.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2021 the original author or authors.
* Copyright 2019-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.jspecify.annotations.Nullable;

import org.springframework.util.Assert;

Expand Down Expand Up @@ -53,7 +54,7 @@ public CompositeBatchInterceptor(BatchInterceptor<K, V>... delegates) {
}

@Override
public ConsumerRecords<K, V> intercept(ConsumerRecords<K, V> records, Consumer<K, V> consumer) {
public @Nullable ConsumerRecords<K, V> intercept(ConsumerRecords<K, V> records, Consumer<K, V> consumer) {
ConsumerRecords<K, V> recordsToIntercept = records;
for (BatchInterceptor<K, V> delegate : this.delegates) {
recordsToIntercept = delegate.intercept(recordsToIntercept, consumer);
Expand Down
Loading