From a9b193fae49100f9b23cc5139e8d248eb6c22322 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Tue, 4 Jan 2022 13:38:38 -0500 Subject: [PATCH] GH-2055: Containers Must Implement DisposableBean Resolves https://github.com/spring-projects/spring-kafka/issues/2055 If context initialization fails, `Lifecycle.stop()` is not called. Containers must be stopped from `DisposableBean` in this case. **cherry-pick to 2.7.x, 2.6.x, 2.5.x** --- build.gradle | 2 +- .../kafka/config/KafkaListenerEndpointRegistry.java | 11 ++--------- .../kafka/listener/MessageListenerContainer.java | 10 ++++++++-- .../listener/KafkaMessageListenerContainerTests.java | 5 +++-- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/build.gradle b/build.gradle index a08e4a0add..658bd005d8 100644 --- a/build.gradle +++ b/build.gradle @@ -1,5 +1,5 @@ buildscript { - ext.kotlinVersion = '1.4.32' + ext.kotlinVersion = '1.6.10' repositories { mavenCentral() maven { url 'https://plugins.gradle.org/m2' } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java index 09990e8203..355ca9dfd9 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2021 the original author or authors. + * Copyright 2014-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -253,14 +253,7 @@ protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint @Override public void destroy() { for (MessageListenerContainer listenerContainer : getListenerContainers()) { - if (listenerContainer instanceof DisposableBean) { - try { - ((DisposableBean) listenerContainer).destroy(); - } - catch (Exception ex) { - this.logger.warn(ex, "Failed to destroy message listener container"); - } - } + listenerContainer.destroy(); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java index d81dd00ed8..27b5f76214 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2021 the original author or authors. + * Copyright 2016-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; +import org.springframework.beans.factory.DisposableBean; import org.springframework.context.SmartLifecycle; import org.springframework.lang.Nullable; @@ -35,7 +36,7 @@ * @author Vladimir Tsanev * @author Tomaz Fernandes */ -public interface MessageListenerContainer extends SmartLifecycle { +public interface MessageListenerContainer extends SmartLifecycle, DisposableBean { /** * Setup the message listener to use. Throws an {@link IllegalArgumentException} @@ -225,4 +226,9 @@ default void stopAbnormally(Runnable callback) { stop(callback); } + @Override + default void destroy() { + stop(); + } + } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index de7837eaa0..73f770fd86 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2021 the original author or authors. + * Copyright 2016-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -650,8 +650,9 @@ public void onMessage(ConsumerRecord data) { inOrder.verify(consumer).commitSync(anyMap(), any()); inOrder.verify(messageListener).onMessage(any(ConsumerRecord.class)); inOrder.verify(consumer).commitSync(anyMap(), any()); - container.stop(); + container.destroy(); assertThat(advised).containsExactly("one", "two", "one", "two"); + assertThat(container.isRunning()).isFalse(); } @Test