Skip to content

Commit 6d45eb0

Browse files
garyrussellartembilan
authored andcommitted
GH-522: Fix NPE in listener container (#524)
Fixes #522 * Polishing - PR Comment **Cherry-pick to 2.0.x & 1.3.x**
1 parent b7acbb3 commit 6d45eb0

File tree

1 file changed

+15
-8
lines changed

1 file changed

+15
-8
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,9 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
9696

9797
private final TopicPartitionInitialOffset[] topicPartitions;
9898

99-
private ListenerConsumer listenerConsumer;
99+
private volatile ListenerConsumer listenerConsumer;
100100

101-
private ListenableFuture<?> listenerConsumerFuture;
101+
private volatile ListenableFuture<?> listenerConsumerFuture;
102102

103103
private GenericMessageListener<?> listener;
104104

@@ -151,11 +151,17 @@ public void setClientIdSuffix(String clientIdSuffix) {
151151
* either explicitly or by Kafka; may be null if not assigned yet.
152152
*/
153153
public Collection<TopicPartition> getAssignedPartitions() {
154-
if (this.listenerConsumer.definedPartitions != null) {
155-
return Collections.unmodifiableCollection(this.listenerConsumer.definedPartitions.keySet());
156-
}
157-
else if (this.listenerConsumer.assignedPartitions != null) {
158-
return Collections.unmodifiableCollection(this.listenerConsumer.assignedPartitions);
154+
ListenerConsumer listenerConsumer = this.listenerConsumer;
155+
if (listenerConsumer != null) {
156+
if (listenerConsumer.definedPartitions != null) {
157+
return Collections.unmodifiableCollection(listenerConsumer.definedPartitions.keySet());
158+
}
159+
else if (listenerConsumer.assignedPartitions != null) {
160+
return Collections.unmodifiableCollection(listenerConsumer.assignedPartitions);
161+
}
162+
else {
163+
return null;
164+
}
159165
}
160166
else {
161167
return null;
@@ -265,7 +271,8 @@ private void publishNonResponsiveConsumerEvent(long timeSinceLastPoll, Consumer<
265271
public String toString() {
266272
return "KafkaMessageListenerContainer [id=" + getBeanName()
267273
+ (this.clientIdSuffix != null ? ", clientIndex=" + this.clientIdSuffix : "")
268-
+ ", topicPartitions=" + getAssignedPartitions()
274+
+ ", topicPartitions="
275+
+ (getAssignedPartitions() == null ? "none assigned" : getAssignedPartitions())
269276
+ "]";
270277
}
271278

0 commit comments

Comments
 (0)