diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java index 2360f72bd4..1073e175f8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java @@ -174,4 +174,16 @@ */ String clientIdPrefix() default ""; + /** + * A pseudo bean name used in SpEL expressions within this annotation to reference + * the current bean within which this listener is defined. This allows access to + * properties and methods within the enclosing bean. + * Default '__listener'. + *

+ * Example: {@code topics = "#{__listener.topicList}"}. + * @return the pseudo bean name. + * @since 2.1.2 + */ + String beanRef() default "__listener"; + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index 2600e14ad3..9b57e464c3 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -40,12 +41,14 @@ import org.springframework.beans.factory.BeanInitializationException; import org.springframework.beans.factory.ListableBeanFactory; import org.springframework.beans.factory.NoSuchBeanDefinitionException; +import org.springframework.beans.factory.ObjectFactory; import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.beans.factory.config.BeanExpressionContext; import org.springframework.beans.factory.config.BeanExpressionResolver; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; +import org.springframework.beans.factory.config.Scope; import org.springframework.context.expression.StandardBeanExpressionResolver; import org.springframework.core.MethodIntrospector; import org.springframework.core.Ordered; @@ -128,6 +131,8 @@ public class KafkaListenerAnnotationBeanPostProcessor private final Log logger = LogFactory.getLog(getClass()); + private final ListenerScope listenerScope = new ListenerScope(); + private KafkaListenerEndpointRegistry endpointRegistry; private String containerFactoryBeanName = DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME; @@ -192,7 +197,8 @@ public void setBeanFactory(BeanFactory beanFactory) { this.beanFactory = beanFactory; if (beanFactory instanceof ConfigurableListableBeanFactory) { this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver(); - this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory, null); + this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory, + this.listenerScope); } } @@ -384,6 +390,10 @@ private Method checkProxy(Method methodArg, Object bean) { protected void processListener(MethodKafkaListenerEndpoint endpoint, KafkaListener kafkaListener, Object bean, Object adminTarget, String beanName) { + String beanRef = kafkaListener.beanRef(); + if (StringUtils.hasText(beanRef)) { + this.listenerScope.addListener(beanRef, bean); + } endpoint.setBean(bean); endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory); endpoint.setId(getEndpointId(kafkaListener)); @@ -416,6 +426,9 @@ protected void processListener(MethodKafkaListenerEndpoint endpoint, Kafka } this.registrar.registerEndpoint(endpoint, factory); + if (StringUtils.hasText(beanRef)) { + this.listenerScope.removeListener(beanRef); + } } private String getEndpointId(KafkaListener kafkaListener) { @@ -723,4 +736,46 @@ protected boolean isEmptyPayload(Object payload) { } + private static class ListenerScope implements Scope { + + private final Map listeners = new HashMap<>(); + + ListenerScope() { + super(); + } + + public void addListener(String key, Object bean) { + this.listeners.put(key, bean); + } + + public void removeListener(String key) { + this.listeners.remove(key); + } + + @Override + public Object get(String name, ObjectFactory objectFactory) { + return this.listeners.get(name); + } + + @Override + public Object remove(String name) { + return null; + } + + @Override + public void registerDestructionCallback(String name, Runnable callback) { + } + + @Override + public Object resolveContextualObject(String key) { + return this.listeners.get(key); + } + + @Override + public String getConversationId() { + return null; + } + + } + } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java index baf7096de4..a064636b72 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java @@ -70,18 +70,24 @@ public class BatchListenerConversionTests { @Autowired private KafkaTemplate template; - @SuppressWarnings("unchecked") @Test public void testBatchOfPojos() throws Exception { + doTest(this.config.listener1(), "blc1"); + doTest(this.config.listener2(), "blc2"); + } + + private void doTest(Listener listener, String topic) throws InterruptedException { this.template.send(new GenericMessage<>( - new Foo("bar"), Collections.singletonMap(KafkaHeaders.TOPIC, "blc1"))); + new Foo("bar"), Collections.singletonMap(KafkaHeaders.TOPIC, topic))); this.template.send(new GenericMessage<>( - new Foo("baz"), Collections.singletonMap(KafkaHeaders.TOPIC, "blc1"))); - assertThat(config.listener().latch1.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(config.listener().received).isInstanceOf(List.class); - assertThat(((List) config.listener().received).size()).isGreaterThan(0); - assertThat(((List) config.listener().received).get(0)).isInstanceOf(Foo.class); - assertThat(((List) config.listener().received).get(0).bar).isEqualTo("bar"); + new Foo("baz"), Collections.singletonMap(KafkaHeaders.TOPIC, topic))); + assertThat(listener.latch1.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(listener.latch2.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(listener.received.size()).isGreaterThan(0); + assertThat(listener.received.get(0)).isInstanceOf(Foo.class); + assertThat(listener.received.get(0).bar).isEqualTo("bar"); + assertThat((listener.receivedTopics).get(0)).isEqualTo(topic); + assertThat((listener.receivedPartitions).get(0)).isEqualTo(0); } @Configuration @@ -134,24 +140,55 @@ public Map producerConfigs() { } @Bean - public Listener listener() { - return new Listener(); + public Listener listener1() { + return new Listener("blc1"); + } + + @Bean + public Listener listener2() { + return new Listener("blc2"); } } public static class Listener { + private final String topic; + private final CountDownLatch latch1 = new CountDownLatch(1); - private Object received; + private final CountDownLatch latch2 = new CountDownLatch(1); + + private List received; + + private List receivedTopics; - @KafkaListener(topics = "blc1") - public void listen(List foos, @Header(KafkaHeaders.OFFSET) List offsets) { - this.received = foos; + private List receivedPartitions; + + public Listener(String topic) { + this.topic = topic; + } + + @KafkaListener(topics = "#{__listener.topic}", groupId = "#{__listener.topic}.group") + public void listen1(List foos, @Header(KafkaHeaders.RECEIVED_TOPIC) List topics, + @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List partitions) { + if (this.received == null) { + this.received = foos; + } + this.receivedTopics = topics; + this.receivedPartitions = partitions; this.latch1.countDown(); } + @KafkaListener(beanRef = "__x", topics = "#{__x.topic}", groupId = "#{__x.topic}.group2") + public void listen2(List foos) { + this.latch2.countDown(); + } + + public String getTopic() { + return this.topic; + } + } public static class Foo { diff --git a/src/reference/asciidoc/kafka.adoc b/src/reference/asciidoc/kafka.adoc index a0d41aff19..1573eece89 100644 --- a/src/reference/asciidoc/kafka.adoc +++ b/src/reference/asciidoc/kafka.adoc @@ -756,6 +756,66 @@ public void listen(List> list, Acknowledgment ac Starting with _version 2.0_, the `id` attribute (if present) is used as the Kafka `group.id` property, overriding the configured property in the consumer factory, if present. You can also set `groupId` explicitly, or set `idIsGroup` to false, to restore the previous behavior of using the consumer factory `group.id`. +You can use property placeholders or SpEL expressions within annotation properties, for example... + +[source, java] +---- +@KafkaListener(topics = "${some.property}") + +@KafkaListener(topics = "#{someBean.someProperty}", + groupId = "#{someBean.someProperty}.group") +---- + +Starting with _version 2.1.2_, the SpEL expressions support a special token `__kafkaListener__` which is a pseudo bean name which represents the current bean instance within which this annotation exists. + +For example, given... + +[source, java] +---- +@Bean +public Listener listener1() { + return new Listener("topic1"); +} + +@Bean +public Listener listener2() { + return new Listener("topic2"); +} +---- + +...we can use... + +[source, java] +---- +public class Listener { + + private final String topic; + + public Listener(String topic) { + this.topic = topic; + } + + @KafkaListener(topics = "#{__listener.topic}", + groupId = "#{__listener.topic}.group") + public void listen(...) { + ... + } + + public String getTopic() { + return this.topic; + } + +} +---- + +If, in the unlikely event that you have an actual bean called `__listener`, you can change the expression token using the `beanRef` attribute... + +[source, java] +---- +@KafkaListener(beanRef = "__x", topics = "#{__x.topic}", + groupId = "#{__x.topic}.group") +---- + ===== Container Thread Naming Listener containers currently use two task executors, one to invoke the consumer and another which will be used to invoke the listener, when the kafka consumer property `enable.auto.commit` is `false`.