diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java index 5b5f9dc41235..d5cf598b2038 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java @@ -178,7 +178,7 @@ private void applyConsumerBuilderCustomizers(List> ConcurrentPulsarListenerContainerFactory pulsarListenerContainerFactory( PulsarConsumerFactory pulsarConsumerFactory, SchemaResolver schemaResolver, TopicResolver topicResolver, ObjectProvider pulsarTransactionManager, - Environment environment) { + PulsarContainerFactoryCustomizers containerFactoryCustomizers, Environment environment) { PulsarContainerProperties containerProperties = new PulsarContainerProperties(); containerProperties.setSchemaResolver(schemaResolver); containerProperties.setTopicResolver(topicResolver); @@ -187,7 +187,10 @@ ConcurrentPulsarListenerContainerFactory pulsarListenerContainerFactory( } pulsarTransactionManager.ifUnique(containerProperties.transactions()::setTransactionManager); this.propertiesMapper.customizeContainerProperties(containerProperties); - return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties); + ConcurrentPulsarListenerContainerFactory containerFactory = new ConcurrentPulsarListenerContainerFactory<>( + pulsarConsumerFactory, containerProperties); + containerFactoryCustomizers.customize(containerFactory); + return containerFactory; } @Bean @@ -215,14 +218,18 @@ private void applyReaderBuilderCustomizers(List> cust @Bean @ConditionalOnMissingBean(name = "pulsarReaderContainerFactory") DefaultPulsarReaderContainerFactory pulsarReaderContainerFactory(PulsarReaderFactory pulsarReaderFactory, - SchemaResolver schemaResolver, Environment environment) { + SchemaResolver schemaResolver, PulsarContainerFactoryCustomizers containerFactoryCustomizers, + Environment environment) { PulsarReaderContainerProperties readerContainerProperties = new PulsarReaderContainerProperties(); readerContainerProperties.setSchemaResolver(schemaResolver); if (Threading.VIRTUAL.isActive(environment)) { readerContainerProperties.setReaderTaskExecutor(new VirtualThreadTaskExecutor("pulsar-reader-")); } this.propertiesMapper.customizeReaderContainerProperties(readerContainerProperties); - return new DefaultPulsarReaderContainerFactory<>(pulsarReaderFactory, readerContainerProperties); + DefaultPulsarReaderContainerFactory containerFactory = new DefaultPulsarReaderContainerFactory<>( + pulsarReaderFactory, readerContainerProperties); + containerFactoryCustomizers.customize(containerFactory); + return containerFactory; } @Configuration(proxyBeanMethods = false) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfiguration.java index ea60717f7bdf..6716f86eb016 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfiguration.java @@ -188,4 +188,11 @@ PulsarTopicBuilder pulsarTopicBuilder() { this.properties.getDefaults().getTopic().getNamespace()); } + @Bean + @ConditionalOnMissingBean + PulsarContainerFactoryCustomizers pulsarContainerFactoryCustomizers( + ObjectProvider> customizers) { + return new PulsarContainerFactoryCustomizers(customizers.orderedStream().toList()); + } + } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarContainerFactoryCustomizer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarContainerFactoryCustomizer.java new file mode 100644 index 000000000000..17e3de79b45d --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarContainerFactoryCustomizer.java @@ -0,0 +1,39 @@ +/* + * Copyright 2012-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.pulsar; + +import org.springframework.pulsar.config.PulsarContainerFactory; + +/** + * Callback interface that can be implemented by beans wishing to customize a + * {@link PulsarContainerFactory} before it is fully initialized, in particular to tune + * its configuration. + * + * @param the type of the {@link PulsarContainerFactory} + * @author Chris Bono + * @since 3.4.0 + */ +@FunctionalInterface +public interface PulsarContainerFactoryCustomizer> { + + /** + * Customize the container factory. + * @param containerFactory the {@code PulsarContainerFactory} to customize + */ + void customize(T containerFactory); + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarContainerFactoryCustomizers.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarContainerFactoryCustomizers.java new file mode 100644 index 000000000000..82bd14889883 --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarContainerFactoryCustomizers.java @@ -0,0 +1,58 @@ +/* + * Copyright 2012-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.springframework.boot.util.LambdaSafe; +import org.springframework.pulsar.config.PulsarContainerFactory; +import org.springframework.pulsar.core.PulsarConsumerFactory; + +/** + * Invokes the available {@link PulsarContainerFactoryCustomizer} instances in the context + * for a given {@link PulsarConsumerFactory}. + * + * @author Chris Bono + * @since 3.4.0 + */ +public class PulsarContainerFactoryCustomizers { + + private final List> customizers; + + public PulsarContainerFactoryCustomizers(List> customizers) { + this.customizers = (customizers != null) ? new ArrayList<>(customizers) : Collections.emptyList(); + } + + /** + * Customize the specified {@link PulsarContainerFactory}. Locates all + * {@link PulsarContainerFactoryCustomizer} beans able to handle the specified + * instance and invoke {@link PulsarContainerFactoryCustomizer#customize} on them. + * @param the type of container factory + * @param containerFactory the container factory to customize + * @return the customized container factory + */ + @SuppressWarnings("unchecked") + public > T customize(T containerFactory) { + LambdaSafe.callbacks(PulsarContainerFactoryCustomizer.class, this.customizers, containerFactory) + .withLogger(PulsarContainerFactoryCustomizers.class) + .invoke((customizer) -> customizer.customize(containerFactory)); + return containerFactory; + } + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java index 5ca96e70579a..45d8d3037212 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java @@ -164,12 +164,15 @@ private void applyMessageConsumerBuilderCustomizers(List reactivePulsarListenerContainerFactory( ReactivePulsarConsumerFactory reactivePulsarConsumerFactory, SchemaResolver schemaResolver, - TopicResolver topicResolver) { + TopicResolver topicResolver, PulsarContainerFactoryCustomizers containerFactoryCustomizers) { ReactivePulsarContainerProperties containerProperties = new ReactivePulsarContainerProperties<>(); containerProperties.setSchemaResolver(schemaResolver); containerProperties.setTopicResolver(topicResolver); this.propertiesMapper.customizeContainerProperties(containerProperties); - return new DefaultReactivePulsarListenerContainerFactory<>(reactivePulsarConsumerFactory, containerProperties); + DefaultReactivePulsarListenerContainerFactory containerFactory = new DefaultReactivePulsarListenerContainerFactory<>( + reactivePulsarConsumerFactory, containerProperties); + containerFactoryCustomizers.customize(containerFactory); + return containerFactory; } @Bean diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java index d8d30e942e32..6a467810fc79 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java @@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -72,6 +73,7 @@ import org.springframework.pulsar.core.SchemaResolver; import org.springframework.pulsar.core.TopicResolver; import org.springframework.pulsar.listener.PulsarContainerProperties.TransactionSettings; +import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory; import org.springframework.pulsar.transaction.PulsarAwareTransactionManager; import org.springframework.test.util.ReflectionTestUtils; @@ -585,6 +587,44 @@ void whenTransactionEnabledFalseListenerContainerShouldNotUseTransactions() { }); } + @Test + void whenHasUserDefinedCustomizersAppliesInCorrectOrder() { + this.contextRunner.withUserConfiguration(ListenerContainerFactoryCustomizersConfig.class) + .run((context) -> assertThat(context).getBean(ConcurrentPulsarListenerContainerFactory.class) + .hasFieldOrPropertyWithValue("containerProperties.subscriptionName", ":bar:foo")); + } + + @TestConfiguration(proxyBeanMethods = false) + static class ListenerContainerFactoryCustomizersConfig { + + @Bean + @Order(50) + PulsarContainerFactoryCustomizer> customizerIgnored() { + return (__) -> { + throw new RuntimeException("should-not-have-matched"); + }; + } + + @Bean + @Order(200) + PulsarContainerFactoryCustomizer> customizerFoo() { + return (containerFactory) -> appendToSubscriptionName(containerFactory, ":foo"); + } + + @Bean + @Order(100) + PulsarContainerFactoryCustomizer> customizerBar() { + return (containerFactory) -> appendToSubscriptionName(containerFactory, ":bar"); + } + + private void appendToSubscriptionName(ConcurrentPulsarListenerContainerFactory containerFactory, + String valueToAppend) { + String name = Objects.toString(containerFactory.getContainerProperties().getSubscriptionName(), ""); + containerFactory.getContainerProperties().setSubscriptionName(name.concat(valueToAppend)); + } + + } + } @Nested @@ -617,7 +657,7 @@ void hasNoTopicBuilderWhenTopicDefaultsAreDisabled() { } @Test - void whenHasUserDefinedCustomizersAppliesInCorrectOrder() { + void whenHasUserDefinedReaderBuilderCustomizersAppliesInCorrectOrder() { this.contextRunner.withPropertyValues("spring.pulsar.reader.name=fromPropsCustomizer") .withUserConfiguration(ReaderBuilderCustomizersConfig.class) .run((context) -> { @@ -654,6 +694,13 @@ void whenVirtualThreadsAreEnabledOnJava20AndEarlierReaderShouldNotUseVirtualThre }); } + @Test + void whenHasUserDefinedFactoryCustomizersAppliesInCorrectOrder() { + this.contextRunner.withUserConfiguration(ReaderContainerFactoryCustomizersConfig.class) + .run((context) -> assertThat(context).getBean(DefaultPulsarReaderContainerFactory.class) + .hasFieldOrPropertyWithValue("containerProperties.readerListener", ":bar:foo")); + } + @TestConfiguration(proxyBeanMethods = false) static class ReaderBuilderCustomizersConfig { @@ -671,6 +718,37 @@ ReaderBuilderCustomizer customizerBar() { } + @TestConfiguration(proxyBeanMethods = false) + static class ReaderContainerFactoryCustomizersConfig { + + @Bean + @Order(50) + PulsarContainerFactoryCustomizer> customizerIgnored() { + return (__) -> { + throw new RuntimeException("should-not-have-matched"); + }; + } + + @Bean + @Order(200) + PulsarContainerFactoryCustomizer> customizerFoo() { + return (containerFactory) -> appendToReaderListener(containerFactory, ":foo"); + } + + @Bean + @Order(100) + PulsarContainerFactoryCustomizer> customizerBar() { + return (containerFactory) -> appendToReaderListener(containerFactory, ":bar"); + } + + private void appendToReaderListener(DefaultPulsarReaderContainerFactory containerFactory, + String valueToAppend) { + String name = Objects.toString(containerFactory.getContainerProperties().getReaderListener(), ""); + containerFactory.getContainerProperties().setReaderListener(name.concat(valueToAppend)); + } + + } + } @Nested diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java index c61777862e43..a0763714ca5c 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java @@ -86,6 +86,15 @@ void whenHasUserDefinedConnectionDetailsBeanDoesNotAutoConfigureBean() { .isSameAs(customConnectionDetails)); } + @Test + void whenHasUserDefinedContainerFactoryCustomizersBeanDoesNotAutoConfigureBean() { + PulsarContainerFactoryCustomizers customizers = mock(PulsarContainerFactoryCustomizers.class); + this.contextRunner + .withBean("customContainerFactoryCustomizers", PulsarContainerFactoryCustomizers.class, () -> customizers) + .run((context) -> assertThat(context).getBean(PulsarContainerFactoryCustomizers.class) + .isSameAs(customizers)); + } + @Nested class ClientTests { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarContainerFactoryCustomizersTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarContainerFactoryCustomizersTests.java new file mode 100644 index 000000000000..dfc290745a63 --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarContainerFactoryCustomizersTests.java @@ -0,0 +1,140 @@ +/* + * Copyright 2012-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.junit.jupiter.api.Test; +import org.mockito.BDDMockito; + +import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory; +import org.springframework.pulsar.config.DefaultPulsarReaderContainerFactory; +import org.springframework.pulsar.config.ListenerContainerFactory; +import org.springframework.pulsar.config.PulsarContainerFactory; +import org.springframework.pulsar.config.PulsarListenerContainerFactory; +import org.springframework.pulsar.core.PulsarConsumerFactory; +import org.springframework.pulsar.listener.PulsarContainerProperties; +import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +/** + * Unit tests for {@link PulsarContainerFactoryCustomizers}. + * + * @author Chris Bono + */ +class PulsarContainerFactoryCustomizersTests { + + @Test + void customizeWithNullCustomizersShouldDoNothing() { + PulsarContainerFactory containerFactory = mock(PulsarContainerFactory.class); + new PulsarContainerFactoryCustomizers(null).customize(containerFactory); + BDDMockito.verifyNoInteractions(containerFactory); + } + + @SuppressWarnings("unchecked") + @Test + void customizeSimplePulsarContainerFactory() { + PulsarContainerFactoryCustomizers customizers = new PulsarContainerFactoryCustomizers( + Collections.singletonList(new SimplePulsarContainerFactoryCustomizer())); + PulsarContainerProperties containerProperties = new PulsarContainerProperties(); + ConcurrentPulsarListenerContainerFactory pulsarContainerFactory = new ConcurrentPulsarListenerContainerFactory<>( + mock(PulsarConsumerFactory.class), containerProperties); + customizers.customize(pulsarContainerFactory); + assertThat(pulsarContainerFactory.getContainerProperties().getSubscriptionName()).isEqualTo("my-subscription"); + } + + @Test + void customizeShouldCheckGeneric() { + List> list = new ArrayList<>(); + list.add(new TestCustomizer<>()); + list.add(new TestPulsarListenersContainerFactoryCustomizer()); + list.add(new TestConcurrentPulsarListenerContainerFactoryCustomizer()); + PulsarContainerFactoryCustomizers customizers = new PulsarContainerFactoryCustomizers(list); + + customizers.customize(mock(PulsarContainerFactory.class)); + assertThat(list.get(0).getCount()).isOne(); + assertThat(list.get(1).getCount()).isZero(); + assertThat(list.get(2).getCount()).isZero(); + + customizers.customize(mock(ConcurrentPulsarListenerContainerFactory.class)); + assertThat(list.get(0).getCount()).isEqualTo(2); + assertThat(list.get(1).getCount()).isOne(); + assertThat(list.get(2).getCount()).isOne(); + + customizers.customize(mock(DefaultReactivePulsarListenerContainerFactory.class)); + assertThat(list.get(0).getCount()).isEqualTo(3); + assertThat(list.get(1).getCount()).isEqualTo(2); + assertThat(list.get(2).getCount()).isOne(); + + customizers.customize(mock(DefaultPulsarReaderContainerFactory.class)); + assertThat(list.get(0).getCount()).isEqualTo(4); + assertThat(list.get(1).getCount()).isEqualTo(2); + assertThat(list.get(2).getCount()).isOne(); + } + + static class SimplePulsarContainerFactoryCustomizer + implements PulsarContainerFactoryCustomizer> { + + @Override + public void customize(ConcurrentPulsarListenerContainerFactory containerFactory) { + containerFactory.getContainerProperties().setSubscriptionName("my-subscription"); + } + + } + + /** + * Test customizer that will match all {@link PulsarListenerContainerFactory}. + */ + static class TestCustomizer> implements PulsarContainerFactoryCustomizer { + + private int count; + + @Override + public void customize(T pulsarContainerFactory) { + this.count++; + } + + int getCount() { + return this.count; + } + + } + + /** + * Test customizer that will match both + * {@link ConcurrentPulsarListenerContainerFactory} and + * {@link DefaultReactivePulsarListenerContainerFactory} as they both extend + * {@link ListenerContainerFactory}. + */ + static class TestPulsarListenersContainerFactoryCustomizer extends TestCustomizer> { + + } + + /** + * Test customizer that will match only + * {@link ConcurrentPulsarListenerContainerFactory}. + */ + static class TestConcurrentPulsarListenerContainerFactoryCustomizer + extends TestCustomizer> { + + } + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java index 0ecb9e85e9fb..9b2d58d0004e 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java @@ -19,6 +19,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.function.Supplier; import com.github.benmanes.caffeine.cache.Caffeine; @@ -45,6 +46,7 @@ import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.context.annotation.Bean; import org.springframework.core.annotation.Order; +import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory; import org.springframework.pulsar.core.DefaultSchemaResolver; import org.springframework.pulsar.core.DefaultTopicResolver; import org.springframework.pulsar.core.PulsarAdministration; @@ -382,6 +384,44 @@ void injectsExpectedBeans() { }); } + @Test + void whenHasUserDefinedFactoryCustomizersAppliesInCorrectOrder() { + this.contextRunner.withUserConfiguration(ListenerContainerFactoryCustomizersConfig.class) + .run((context) -> assertThat(context).getBean(DefaultReactivePulsarListenerContainerFactory.class) + .hasFieldOrPropertyWithValue("containerProperties.subscriptionName", ":bar:foo")); + } + + @TestConfiguration(proxyBeanMethods = false) + static class ListenerContainerFactoryCustomizersConfig { + + @Bean + @Order(50) + PulsarContainerFactoryCustomizer> customizerIgnored() { + return (__) -> { + throw new RuntimeException("should-not-have-matched"); + }; + } + + @Bean + @Order(200) + PulsarContainerFactoryCustomizer> customizerFoo() { + return (containerFactory) -> appendToSubscriptionName(containerFactory, ":foo"); + } + + @Bean + @Order(100) + PulsarContainerFactoryCustomizer> customizerBar() { + return (containerFactory) -> appendToSubscriptionName(containerFactory, ":bar"); + } + + private void appendToSubscriptionName(DefaultReactivePulsarListenerContainerFactory containerFactory, + String valueToAppend) { + String name = Objects.toString(containerFactory.getContainerProperties().getSubscriptionName(), ""); + containerFactory.getContainerProperties().setSubscriptionName(name.concat(valueToAppend)); + } + + } + } @Nested diff --git a/spring-boot-project/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/pulsar.adoc b/spring-boot-project/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/pulsar.adoc index 558456f1ecad..bc6a8c6e3837 100644 --- a/spring-boot-project/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/pulsar.adoc +++ b/spring-boot-project/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/pulsar.adoc @@ -150,11 +150,11 @@ include-code::MyBean[] Spring Boot auto-configuration provides all the components necessary for `PulsarListener`, such as the `PulsarListenerContainerFactory` and the consumer factory it uses to construct the underlying Pulsar consumers. You can configure these components by specifying any of the `spring.pulsar.listener.\*` and `spring.pulsar.consumer.*` prefixed application properties. -If you need more control over the consumer factory configuration, consider registering one or more `ConsumerBuilderCustomizer` beans. +If you need more control over the configuration of the consumer factory used by the container factory to create consumers, consider registering one or more `ConsumerBuilderCustomizer` beans. These customizers are applied to all consumers created by the factory, and therefore all `@PulsarListener` instances. You can also customize a single listener by setting the `consumerCustomizer` attribute of the `@PulsarListener` annotation. - +If you need more control over the actual container factory configuration, consider registering one or more `PulsarContainerFactoryCustomizer>` beans. [[messaging.pulsar.receiving-reactive]] == Receiving a Message Reactively @@ -165,13 +165,13 @@ The following component creates a reactive listener endpoint on the `someTopic` include-code::MyBean[] Spring Boot auto-configuration provides all the components necessary for `ReactivePulsarListener`, such as the `ReactivePulsarListenerContainerFactory` and the consumer factory it uses to construct the underlying reactive Pulsar consumers. -You can configure these components by specifying any of the `spring.pulsar.listener.*` and `spring.pulsar.consumer.*` prefixed application properties. +You can configure these components by specifying any of the `spring.pulsar.listener.\*` and `spring.pulsar.consumer.*` prefixed application properties. -If you need more control over the consumer factory configuration, consider registering one or more `ReactiveMessageConsumerBuilderCustomizer` beans. +If you need more control over the configuration of the consumer factory used by the container factory to create consumers, consider registering one or more `ReactiveMessageConsumerBuilderCustomizer` beans. These customizers are applied to all consumers created by the factory, and therefore all `@ReactivePulsarListener` instances. You can also customize a single listener by setting the `consumerCustomizer` attribute of the `@ReactivePulsarListener` annotation. - +If you need more control over the actual container factory configuration, consider registering one or more `PulsarContainerFactoryCustomizer>` beans. [[messaging.pulsar.reading]] == Reading a Message @@ -187,10 +187,11 @@ include-code::MyBean[] The `@PulsarReader` relies on a `PulsarReaderFactory` to create the underlying Pulsar reader. Spring Boot auto-configuration provides this reader factory which can be customized by setting any of the `spring.pulsar.reader.*` prefixed application properties. -If you need more control over the reader factory configuration, consider registering one or more `ReaderBuilderCustomizer` beans. +If you need more control over the configuration of the reader factory used by the container factory to create readers, consider registering one or more `ReaderBuilderCustomizer` beans. These customizers are applied to all readers created by the factory, and therefore all `@PulsarReader` instances. You can also customize a single listener by setting the `readerCustomizer` attribute of the `@PulsarReader` annotation. +If you need more control over the actual container factory configuration, consider registering one or more `PulsarContainerFactoryCustomizer>` beans. [[messaging.pulsar.reading-reactive]]