From dc6c5724c7d3f39e1e7473ab8837fcab932d66b7 Mon Sep 17 00:00:00 2001 From: Wzy19930507 <1208931582@qq.com> Date: Fri, 23 Feb 2024 11:16:30 +0800 Subject: [PATCH] ConcurrentMessageListenerContainer#isInExpectedState consistency problem MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit change stream to for loops, prevent `ConcurrentModificationException` simultaneously invoke `KafkaMessageListenerContainer#setStoppedNormally` and `ConcurrentMessageListenerContainer#isInExpectedState` —— modify property isInExpectedState when invoke stream.allMatch cause list size change threw `ConcurrentModificationException` by `ArrayList$ArrayListSpliterator#tryAdvance`. --- .../ConcurrentMessageListenerContainer.java | 31 ++++++++++++------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java index 45d9dd85a6..b8883ad935 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java @@ -25,7 +25,6 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; @@ -121,12 +120,12 @@ public void setAlwaysClientIdSuffix(boolean alwaysClientIdSuffix) { public List> getContainers() { this.lifecycleLock.lock(); try { - return Collections.unmodifiableList(new ArrayList<>(this.containers)); + return List.copyOf(this.containers); } finally { this.lifecycleLock.unlock(); } -} + } @Override public MessageListenerContainer getContainerFor(String topic, int partition) { @@ -157,7 +156,7 @@ public Collection getAssignedPartitions() { .map(KafkaMessageListenerContainer::getAssignedPartitions) .filter(Objects::nonNull) .flatMap(Collection::stream) - .collect(Collectors.toList()); + .toList(); } finally { this.lifecycleLock.unlock(); @@ -259,7 +258,6 @@ protected void doStart() { } } - @SuppressWarnings("deprecation") private void configureChildContainer(int index, KafkaMessageListenerContainer container) { String beanName = getBeanName(); beanName = (beanName == null ? "consumer" : beanName) + "-" + index; @@ -308,13 +306,17 @@ private KafkaMessageListenerContainer constructContainer(ContainerProperti return container; } + @Nullable private TopicPartitionOffset[] partitionSubset(ContainerProperties containerProperties, int index) { TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions(); + if (topicPartitions == null) { + return null; + } if (this.concurrency == 1) { - return topicPartitions; // NOSONAR + return topicPartitions; } else { - int numPartitions = topicPartitions.length; // NOSONAR + int numPartitions = topicPartitions.length; if (numPartitions == this.concurrency) { return new TopicPartitionOffset[] { topicPartitions[index] }; } @@ -389,7 +391,7 @@ && getContainerProperties().isRestartAfterAuthExceptions() if (exec == null) { exec = new SimpleAsyncTaskExecutor(getListenerId() + ".authRestart"); } - exec.execute(() -> start()); + exec.execute(this::start); } } @@ -477,10 +479,15 @@ public boolean isPartitionPaused(TopicPartition topicPartition) { public boolean isInExpectedState() { this.lifecycleLock.lock(); try { - return (isRunning() || isStoppedNormally()) && this.containers - .stream() - .map(container -> container.isInExpectedState()) - .allMatch(bool -> Boolean.TRUE.equals(bool)); + boolean isInExpectedState = isRunning() || isStoppedNormally(); + if (isInExpectedState) { + for (KafkaMessageListenerContainer container : this.containers) { + if (!container.isInExpectedState()) { + return false; + } + } + } + return isInExpectedState; } finally { this.lifecycleLock.unlock();