Skip to content

Commit c91908c

Browse files
committed
GH-522: Fix NPE in listener container
Fixes #522
1 parent 0262d87 commit c91908c

File tree

1 file changed

+13
-7
lines changed

1 file changed

+13
-7
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,9 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
9898

9999
private final TopicPartitionInitialOffset[] topicPartitions;
100100

101-
private ListenerConsumer listenerConsumer;
101+
private volatile ListenerConsumer listenerConsumer;
102102

103-
private ListenableFuture<?> listenerConsumerFuture;
103+
private volatile ListenableFuture<?> listenerConsumerFuture;
104104

105105
private GenericMessageListener<?> listener;
106106

@@ -180,11 +180,17 @@ public void setClientIdSuffix(String clientIdSuffix) {
180180
* either explicitly or by Kafka; may be null if not assigned yet.
181181
*/
182182
public Collection<TopicPartition> getAssignedPartitions() {
183-
if (this.listenerConsumer.definedPartitions != null) {
184-
return Collections.unmodifiableCollection(this.listenerConsumer.definedPartitions.keySet());
185-
}
186-
else if (this.listenerConsumer.assignedPartitions != null) {
187-
return Collections.unmodifiableCollection(this.listenerConsumer.assignedPartitions);
183+
ListenerConsumer listenerConsumer = this.listenerConsumer;
184+
if (listenerConsumer != null) {
185+
if (listenerConsumer.definedPartitions != null) {
186+
return Collections.unmodifiableCollection(listenerConsumer.definedPartitions.keySet());
187+
}
188+
else if (listenerConsumer.assignedPartitions != null) {
189+
return Collections.unmodifiableCollection(listenerConsumer.assignedPartitions);
190+
}
191+
else {
192+
return null;
193+
}
188194
}
189195
else {
190196
return null;

0 commit comments

Comments
 (0)