Skip to content

Commit 53f357d

Browse files
garyrussellartembilan
authored andcommitted
GH-970: Add getGroupId() to containers
Resolves #970 - allow retrieval of the `group.id`, even if not set on the container properties - also add `getAllListenerContainers` to the `RLERegistry` as a convenience - also add `getListenerId` to return the id or bean name of the container
1 parent dfd7c7a commit 53f357d

File tree

7 files changed

+85
-7
lines changed

7 files changed

+85
-7
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2016 the original author or authors.
2+
* Copyright 2014-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -112,11 +112,28 @@ public Set<String> getListenerContainerIds() {
112112
/**
113113
* Return the managed {@link MessageListenerContainer} instance(s).
114114
* @return the managed {@link MessageListenerContainer} instance(s).
115+
* @see #getAllListenerContainers()
115116
*/
116117
public Collection<MessageListenerContainer> getListenerContainers() {
117118
return Collections.unmodifiableCollection(this.listenerContainers.values());
118119
}
119120

121+
/**
122+
* Return all {@link MessageListenerContainer} instances including those managed by
123+
* this registry and those declared as beans in the application context.
124+
* Prototype-scoped containers will be included. Lazy beans that have not yet been
125+
* created will not be initialized by a call to this method.
126+
* @return the {@link MessageListenerContainer} instance(s).
127+
* @since 2.2.5
128+
* @see #getListenerContainers()
129+
*/
130+
public Collection<MessageListenerContainer> getAllListenerContainers() {
131+
List<MessageListenerContainer> containers = new ArrayList<>();
132+
containers.addAll(getListenerContainers());
133+
containers.addAll(this.applicationContext.getBeansOfType(MessageListenerContainer.class, true, false).values());
134+
return containers;
135+
}
136+
120137
/**
121138
* Create a message listener container for the given {@link KafkaListenerEndpoint}.
122139
* <p>This create the necessary infrastructure to honor that endpoint

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.springframework.kafka.core.ConsumerFactory;
3838
import org.springframework.kafka.event.ContainerStoppedEvent;
3939
import org.springframework.kafka.support.TopicPartitionInitialOffset;
40+
import org.springframework.lang.Nullable;
4041
import org.springframework.util.Assert;
4142
import org.springframework.util.StringUtils;
4243

@@ -245,6 +246,21 @@ public ContainerProperties getContainerProperties() {
245246
return this.containerProperties;
246247
}
247248

249+
@Override
250+
public String getGroupId() {
251+
return this.containerProperties.getGroupId() == null
252+
? (String) this.consumerFactory
253+
.getConfigurationProperties()
254+
.get(ConsumerConfig.GROUP_ID_CONFIG)
255+
: this.containerProperties.getGroupId();
256+
}
257+
258+
@Override
259+
@Nullable
260+
public String getListenerId() {
261+
return this.beanName; // the container factory sets the bean name to the id attribute
262+
}
263+
248264
@Override
249265
public void setupMessageListener(Object messageListener) {
250266
this.containerProperties.setMessageListener(messageListener);

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -448,10 +448,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
448448

449449
private final TransactionTemplate transactionTemplate;
450450

451-
private final String consumerGroupId = this.containerProperties.getGroupId() == null
452-
? (String) KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties()
453-
.get(ConsumerConfig.GROUP_ID_CONFIG)
454-
: this.containerProperties.getGroupId();
451+
private final String consumerGroupId = getGroupId();
455452

456453
private final TaskScheduler taskScheduler;
457454

spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 the original author or authors.
2+
* Copyright 2016-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,6 +24,7 @@
2424
import org.apache.kafka.common.TopicPartition;
2525

2626
import org.springframework.context.SmartLifecycle;
27+
import org.springframework.lang.Nullable;
2728

2829
/**
2930
* Internal abstraction used by the framework representing a message
@@ -114,4 +115,25 @@ default void setAutoStartup(boolean autoStartup) {
114115
// empty
115116
}
116117

118+
/**
119+
* Return the {@code group.id} property for this container whether specifically set on the
120+
* container or via a consumer property on the consumer factory.
121+
* @return the group id.
122+
* @since 2.2.5
123+
*/
124+
default String getGroupId() {
125+
throw new UnsupportedOperationException("This container does not support retrieving the group id");
126+
}
127+
128+
/**
129+
* The 'id' attribute of a {@code @KafkaListener} or the bean name for spring-managed
130+
* containers.
131+
* @return the id or bean name.
132+
* @since 2.2.5
133+
*/
134+
@Nullable
135+
default String getListenerId() {
136+
throw new UnsupportedOperationException("This container does not support retrieving the listener id");
137+
}
138+
117139
}

spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerStoppingBatchErrorHandlerTests.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.Arrays;
2828
import java.util.Collection;
2929
import java.util.Collections;
30+
import java.util.Iterator;
3031
import java.util.LinkedHashMap;
3132
import java.util.List;
3233
import java.util.Map;
@@ -96,6 +97,16 @@ public void stopContainerAfterException() throws Exception {
9697
inOrder.verify(this.consumer).unsubscribe();
9798
inOrder.verify(this.consumer).close();
9899
inOrder.verifyNoMoreInteractions();
100+
assertThat(this.registry.getListenerContainers()).hasSize(1);
101+
Collection<MessageListenerContainer> containers = this.registry.getAllListenerContainers();
102+
assertThat(containers).hasSize(2);
103+
Iterator<MessageListenerContainer> iterator = containers.iterator();
104+
MessageListenerContainer one = iterator.next();
105+
MessageListenerContainer two = iterator.next();
106+
assertThat(one).isNotSameAs(two);
107+
assertThat(two).isSameAs(this.config.springManagedContainer());
108+
assertThat(one.getListenerId()).isEqualTo(CONTAINER_ID);
109+
assertThat(two.getListenerId()).isEqualTo("springManagedContainer");
99110
}
100111

101112
@Configuration
@@ -178,7 +189,7 @@ public Consumer consumer() {
178189

179190
@SuppressWarnings({ "rawtypes", "unchecked" })
180191
@Bean
181-
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
192+
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
182193
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
183194
factory.setConsumerFactory(consumerFactory());
184195
factory.getContainerProperties().setAckOnError(false);
@@ -203,6 +214,13 @@ public void handle(Exception thrownException, ConsumerRecords<?, ?> records,
203214
return factory;
204215
}
205216

217+
@Bean
218+
public ConcurrentMessageListenerContainer<String, String> springManagedContainer() {
219+
ConcurrentMessageListenerContainer<String, String> container = kafkaListenerContainerFactory()
220+
.createContainer("springManaged");
221+
container.setAutoStartup(false);
222+
return container;
223+
}
206224
}
207225

208226
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ public void testDelegateType() throws Exception {
186186
.collect(Collectors.toList()));
187187
}
188188
});
189+
assertThat(container.getGroupId()).isEqualTo("delegate");
189190
container.start();
190191

191192
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
@@ -270,9 +271,11 @@ public void testNoResetPolicy() throws Exception {
270271
trace.set(new RuntimeException().getStackTrace());
271272
latch1.countDown();
272273
});
274+
containerProps.setGroupId("delegateGroup");
273275
KafkaMessageListenerContainer<Integer, String> container =
274276
new KafkaMessageListenerContainer<>(cf, containerProps);
275277
container.setBeanName("delegate");
278+
assertThat(container.getGroupId()).isEqualTo("delegateGroup");
276279
container.start();
277280

278281
int n = 0;

src/reference/asciidoc/kafka.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1320,6 +1320,11 @@ private KafkaListenerEndpointRegistry registry;
13201320
----
13211321
====
13221322

1323+
The registry only maintains the life cycle of containers it manages; containers declared as beans are not managed by the registry and can be obtained from the application context.
1324+
A collection of managed containers can be obtained by calling the registry's `getListenerContainers()` method.
1325+
Version 2.2.5 added a convenience method `getAllListenerContainers()`, which returns a collection of all containers, including those managed by the registry and those declared as beans.
1326+
The collection returned will include any prototype beans that have been initialized, but it will not initialize any lazy bean declarations.
1327+
13231328
[[kafka-validation]]
13241329
===== `@KafkaListener` `@Payload` Validation
13251330

0 commit comments

Comments
 (0)