Skip to content

Commit ddde5e3

Browse files
garyrussellartembilan
authored andcommitted
GH-475 Fix Listener Container SmartLifecycle phase
Fixes #475 Previously, containers were started in phase 0; thy should be started in a late phase. __cherry-pick to 2.0.x, 1.3.x__ (cherry picked from commit 39bf448) # Conflicts: # src/reference/asciidoc/kafka.adoc
1 parent bf82fde commit ddde5e3

File tree

3 files changed

+46
-6
lines changed

3 files changed

+46
-6
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.springframework.context.ConfigurableApplicationContext;
3939
import org.springframework.context.SmartLifecycle;
4040
import org.springframework.context.event.ContextRefreshedEvent;
41+
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
4142
import org.springframework.kafka.listener.MessageListenerContainer;
4243
import org.springframework.util.Assert;
4344
import org.springframework.util.StringUtils;
@@ -72,7 +73,7 @@ public class KafkaListenerEndpointRegistry implements DisposableBean, SmartLifec
7273
private final Map<String, MessageListenerContainer> listenerContainers =
7374
new ConcurrentHashMap<String, MessageListenerContainer>();
7475

75-
private int phase = Integer.MAX_VALUE;
76+
private int phase = AbstractMessageListenerContainer.DEFAULT_PHASE;
7677

7778
private ConfigurableApplicationContext applicationContext;
7879

@@ -191,10 +192,11 @@ protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint
191192
}
192193

193194
int containerPhase = listenerContainer.getPhase();
194-
if (containerPhase < Integer.MAX_VALUE) { // a custom phase value
195-
if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) {
196-
throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " +
197-
this.phase + " vs " + containerPhase);
195+
if (listenerContainer.isAutoStartup() &&
196+
containerPhase != AbstractMessageListenerContainer.DEFAULT_PHASE) { // a custom phase value
197+
if (this.phase != AbstractMessageListenerContainer.DEFAULT_PHASE && this.phase != containerPhase) {
198+
throw new IllegalStateException("Encountered phase mismatch between container "
199+
+ "factory definitions: " + this.phase + " vs " + containerPhase);
198200
}
199201
this.phase = listenerContainer.getPhase();
200202
}

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@
4545
public abstract class AbstractMessageListenerContainer<K, V>
4646
implements MessageListenerContainer, BeanNameAware, ApplicationEventPublisherAware, SmartLifecycle {
4747

48+
/**
49+
* The default {@link SmartLifecycle} phase for listener containers {@value #DEFAULT_PHASE}.
50+
*/
51+
public static final int DEFAULT_PHASE = Integer.MAX_VALUE - 100; // late phase
52+
4853
protected final Log logger = LogFactory.getLog(this.getClass()); // NOSONAR
4954

5055
/**
@@ -108,7 +113,7 @@ public enum AckMode {
108113

109114
private boolean autoStartup = true;
110115

111-
private int phase = 0;
116+
private int phase = DEFAULT_PHASE;
112117

113118
private volatile boolean running = false;
114119

src/reference/asciidoc/kafka.adoc

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,14 @@ public interface Acknowledgment {
461461

462462
This gives the listener control over when offsets are committed.
463463

464+
[[container-auto-startup]]
465+
====== Listener Container Auto Startup
466+
467+
The listener containers implement `SmartLifecycle` and `autoStartup` is `true` by default; the containers are started in a late phase (`Integer.MAX-VALUE - 100`).
468+
Other components that implement `SmartLifecycle`, that handle data from listeners, should be started in an earlier phase.
469+
The `- 100` leaves room for later phases to enable components to be auto-started after the containers.
470+
471+
464472
[[kafka-listener-annotation]]
465473
===== @KafkaListener Annotation
466474

@@ -676,6 +684,31 @@ static class MultiListenerBean {
676684
}
677685
----
678686

687+
[[kafkalistener-lifecycle]]
688+
===== @KafkaListener Lifecycle Management
689+
690+
The listener containers created for `@KafkaListener` annotations are not beans in the application context.
691+
Instead, they are registered with an infrastructure bean of type `KafkaListenerEndpointRegistry`.
692+
This bean manages the containers' lifecycles; it will auto-start any containers that have `autoStartup` set to `true`.
693+
All containers created by all container factories must be in the same `phase` - see <<container-auto-startup>> for more information.
694+
You can manage the lifecycle programmatically using the registry; starting/stopping the registry will start/stop all the registered containers.
695+
Or, you can get a reference to an individual container using its `id` attribute; you can set `autoStartup` on the annotation, which will override the default setting configured into the container factory.
696+
697+
[source, java]
698+
----
699+
@Autowired
700+
private KafkaListenerEndpointRegistry registry;
701+
702+
...
703+
704+
@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
705+
public void listen(...) { ... }
706+
707+
...
708+
709+
registry.getListenerContainer("myContainer").start();
710+
----
711+
679712
===== Filtering Messages
680713

681714
In certain scenarios, such as rebalancing, a message may be redelivered that has already been processed.

0 commit comments

Comments
 (0)