diff --git a/buildSrc/src/main/java/org/springframework/boot/build/context/properties/DocumentConfigurationProperties.java b/buildSrc/src/main/java/org/springframework/boot/build/context/properties/DocumentConfigurationProperties.java index cbb23b79e527..4fb4b22da9bc 100644 --- a/buildSrc/src/main/java/org/springframework/boot/build/context/properties/DocumentConfigurationProperties.java +++ b/buildSrc/src/main/java/org/springframework/boot/build/context/properties/DocumentConfigurationProperties.java @@ -92,7 +92,8 @@ void documentConfigurationProperties() throws IOException { "Hikari specific settings bound to an instance of Hikari's HikariDataSource") .addSection("transaction").withKeyPrefixes("spring.jta", "spring.transaction").addSection("integration") .withKeyPrefixes("spring.activemq", "spring.artemis", "spring.batch", "spring.integration", - "spring.jms", "spring.kafka", "spring.rabbitmq", "spring.hazelcast", "spring.webservices") + "spring.jms", "spring.kafka", "spring.rabbitmq", "spring.multirabbitmq", "spring.hazelcast", + "spring.webservices") .addSection("actuator").withKeyPrefixes("management").addSection("devtools") .withKeyPrefixes("spring.devtools").addSection("testing").withKeyPrefixes("spring.test"); DocumentOptions options = builder.build(); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/MultiRabbitAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/MultiRabbitAutoConfiguration.java new file mode 100644 index 000000000000..c9eaad6cabe2 --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/MultiRabbitAutoConfiguration.java @@ -0,0 +1,139 @@ +/* + * Copyright 2012-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.amqp; + +import java.util.Map; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.impl.CredentialsProvider; +import com.rabbitmq.client.impl.CredentialsRefreshService; + +import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils; +import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactoryContextWrapper; +import org.springframework.amqp.rabbit.connection.ConnectionNameStrategy; +import org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.BeanFactoryAware; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; +import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration.RabbitConnectionFactoryCreator; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.context.annotation.Primary; +import org.springframework.core.io.ResourceLoader; + +/** + * Class responsible for auto-configuring the necessary beans to enable multiple RabbitMQ + * servers. + * + * @author Wander Costa + * @since 2.4 + */ +@Configuration(proxyBeanMethods = false) +@ConditionalOnClass({ RabbitTemplate.class, Channel.class }) +@EnableConfigurationProperties({ RabbitProperties.class, MultiRabbitProperties.class }) +@Import(RabbitAnnotationDrivenConfiguration.class) +public class MultiRabbitAutoConfiguration { + + @Configuration(proxyBeanMethods = false) + @ConditionalOnProperty(prefix = "spring.multirabbitmq", name = "enabled", havingValue = "true") + protected static class MultiRabbitConnectionFactoryCreator extends RabbitConnectionFactoryCreator + implements BeanFactoryAware, ApplicationContextAware { + + private ConfigurableListableBeanFactory beanFactory; + + private ApplicationContext applicationContext; + + @Bean + @ConditionalOnMissingBean + public ConnectionFactoryContextWrapper contextWrapper( + @Qualifier(RabbitListenerConfigUtils.RABBIT_CONNECTION_FACTORY_BEAN_NAME) final ConnectionFactory connectionFactory) { + return new ConnectionFactoryContextWrapper(connectionFactory); + } + + @Primary + @Bean(RabbitListenerConfigUtils.RABBIT_CONNECTION_FACTORY_BEAN_NAME) + public ConnectionFactory routingConnectionFactory(final RabbitProperties rabbitProperties, + final MultiRabbitProperties multiRabbitProperties, final ResourceLoader resourceLoader, + final ObjectProvider credentialsProvider, + final ObjectProvider credentialsRefreshService, + final ObjectProvider connectionNameStrategy) throws Exception { + final MultiRabbitConnectionFactoryWrapper wrapper = new MultiRabbitConnectionFactoryWrapper(); + final ConnectionFactory defaultConnectionFactory = super.rabbitConnectionFactory(rabbitProperties, + resourceLoader, credentialsProvider, credentialsRefreshService, connectionNameStrategy); + wrapper.setDefaultConnectionFactory(defaultConnectionFactory); + this.registerNewContainerFactoryBean(RabbitListenerConfigUtils.MULTI_RABBIT_CONTAINER_FACTORY_BEAN_NAME, + defaultConnectionFactory); + this.registerNewRabbitAdminBean(RabbitListenerConfigUtils.RABBIT_ADMIN_BEAN_NAME, defaultConnectionFactory); + + for (final Map.Entry entry : multiRabbitProperties.getConnections().entrySet()) { + final String key = entry.getKey(); + final RabbitProperties properties = entry.getValue(); + final ConnectionFactory connectionFactory = super.rabbitConnectionFactory(properties, resourceLoader, + credentialsProvider, credentialsRefreshService, connectionNameStrategy); + this.registerNewContainerFactoryBean(key, connectionFactory); + this.registerNewRabbitAdminBean(key.concat(RabbitListenerConfigUtils.MULTI_RABBIT_ADMIN_SUFFIX), + connectionFactory); + wrapper.addConnectionFactory(key, connectionFactory); + } + + final SimpleRoutingConnectionFactory connectionFactory = new SimpleRoutingConnectionFactory(); + connectionFactory.setTargetConnectionFactories(wrapper.getConnectionFactories()); + connectionFactory.setDefaultTargetConnectionFactory(wrapper.getDefaultConnectionFactory()); + return connectionFactory; + } + + private void registerNewContainerFactoryBean(final String name, final ConnectionFactory connectionFactory) { + final SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory(); + containerFactory.setApplicationContext(this.applicationContext); + containerFactory.setConnectionFactory(connectionFactory); + this.beanFactory.registerSingleton(name, containerFactory); + } + + private void registerNewRabbitAdminBean(final String name, final ConnectionFactory connectionFactory) { + final RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); + rabbitAdmin.setApplicationContext(this.applicationContext); + rabbitAdmin.setBeanName(name); + rabbitAdmin.afterPropertiesSet(); + this.beanFactory.registerSingleton(name, rabbitAdmin); + } + + @Override + public void setBeanFactory(final BeanFactory beanFactory) { + this.beanFactory = (ConfigurableListableBeanFactory) beanFactory; + } + + @Override + public void setApplicationContext(final ApplicationContext applicationContext) { + this.applicationContext = applicationContext; + } + + } + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/MultiRabbitConnectionFactoryWrapper.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/MultiRabbitConnectionFactoryWrapper.java new file mode 100644 index 000000000000..5ad55d018f4f --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/MultiRabbitConnectionFactoryWrapper.java @@ -0,0 +1,60 @@ +/* + * Copyright 2012-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.amqp; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.util.Assert; + +/** + * A wrapper for RabbitMQ structures to offer an easy way to integrate with MultiRabbit. + * It's backed by a {@link HashMap} that holds all non-default structures, but does not + * allow null keys since there is a default field holding . + * + * @author Wander Costa + * @since 2.4 + */ +public class MultiRabbitConnectionFactoryWrapper { + + private final Map connectionFactories = new HashMap<>(); + + private final Map unmodifiableMapReference = Collections + .unmodifiableMap(this.connectionFactories); + + private ConnectionFactory defaultConnectionFactory; + + public void setDefaultConnectionFactory(final ConnectionFactory connectionFactory) { + this.defaultConnectionFactory = connectionFactory; + } + + ConnectionFactory getDefaultConnectionFactory() { + return this.defaultConnectionFactory; + } + + public void addConnectionFactory(final String key, final ConnectionFactory connectionFactory) { + Assert.hasText(key, "Key may not be null or empty"); + this.connectionFactories.put(key, connectionFactory); + } + + Map getConnectionFactories() { + return this.unmodifiableMapReference; + } + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/MultiRabbitProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/MultiRabbitProperties.java new file mode 100644 index 000000000000..a7b12fd1cca5 --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/MultiRabbitProperties.java @@ -0,0 +1,65 @@ +/* + * Copyright 2012-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.amqp; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.lang.Nullable; + +/** + * Configuration properties for multiple Rabbit. + * + * @author Wander Costa + * @see RabbitProperties + * @since 2.4 + */ +@ConfigurationProperties(prefix = "spring.multirabbitmq") +public class MultiRabbitProperties { + + /** + * A flag to enable/disable MultiRabbit processing. + */ + @Value("${" + RabbitListenerConfigUtils.MULTI_RABBIT_ENABLED_PROPERTY + "}") + private boolean enabled; + + /** + * A map representing the RabbitProperties of all available brokers. + */ + private Map connections = new HashMap<>(); + + public boolean isEnabled() { + return this.enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public Map getConnections() { + return this.connections; + } + + public void setConnections(@Nullable final Map connections) { + this.connections = Optional.ofNullable(connections).orElse(new HashMap<>()); + } + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfiguration.java index 91ee1e6acf37..294516d23c1d 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfiguration.java @@ -24,6 +24,7 @@ import com.rabbitmq.client.impl.CredentialsRefreshService; import org.springframework.amqp.core.AmqpAdmin; +import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionNameStrategy; @@ -96,7 +97,7 @@ public class RabbitAutoConfiguration { @ConditionalOnMissingBean(ConnectionFactory.class) protected static class RabbitConnectionFactoryCreator { - @Bean + @Bean(RabbitListenerConfigUtils.RABBIT_CONNECTION_FACTORY_BEAN_NAME) public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties properties, ResourceLoader resourceLoader, ObjectProvider credentialsProvider, ObjectProvider credentialsRefreshService, @@ -187,7 +188,7 @@ public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, Connec return template; } - @Bean + @Bean(RabbitListenerConfigUtils.RABBIT_ADMIN_BEAN_NAME) @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true) @ConditionalOnMissingBean diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/spring-boot-project/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json index ba8527ebbae5..fca03af638b6 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -1649,6 +1649,16 @@ "level": "error" } }, + { + "name": "spring.multirabbitmq.enabled", + "type": "java.lang.Boolean", + "description": "The flag to enable/disable MultiRabbit processing." + }, + { + "name": "spring.multirabbitmq.connections", + "type": "java.util.Map", + "description": "The Map containing all the connections." + }, { "name": "spring.reactor.stacktrace-mode.enabled", "description": "Whether Reactor should collect stacktrace information at runtime.", diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/resources/META-INF/spring.factories b/spring-boot-project/spring-boot-autoconfigure/src/main/resources/META-INF/spring.factories index ed0fa2ed8547..af367dc5df39 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/resources/META-INF/spring.factories +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/resources/META-INF/spring.factories @@ -21,6 +21,7 @@ org.springframework.boot.autoconfigure.condition.OnWebApplicationCondition org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.boot.autoconfigure.admin.SpringApplicationAdminJmxAutoConfiguration,\ org.springframework.boot.autoconfigure.aop.AopAutoConfiguration,\ +org.springframework.boot.autoconfigure.amqp.MultiRabbitAutoConfiguration,\ org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration,\ org.springframework.boot.autoconfigure.batch.BatchAutoConfiguration,\ org.springframework.boot.autoconfigure.cache.CacheAutoConfiguration,\ diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/MultiRabbitAutoConfigurationCompatibilityTest.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/MultiRabbitAutoConfigurationCompatibilityTest.java new file mode 100644 index 000000000000..98afaf20207b --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/MultiRabbitAutoConfigurationCompatibilityTest.java @@ -0,0 +1,35 @@ +/* + * Copyright 2012-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.amqp; + +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; + +/** + * Tests to ensure compatibility of {@link MultiRabbitAutoConfiguration} with + * {@link RabbitAutoConfiguration} if MultiRabbit is not enabled. + * + * @author Wander Costa + */ +class MultiRabbitAutoConfigurationCompatibilityTest extends RabbitAutoConfigurationTests { + + MultiRabbitAutoConfigurationCompatibilityTest() { + super.contextRunner = new ApplicationContextRunner().withConfiguration( + AutoConfigurations.of(MultiRabbitAutoConfiguration.class, RabbitAutoConfiguration.class)); + } + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/MultiRabbitAutoConfigurationTest.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/MultiRabbitAutoConfigurationTest.java new file mode 100644 index 000000000000..4ed4f05dd14b --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/MultiRabbitAutoConfigurationTest.java @@ -0,0 +1,206 @@ +/* + * Copyright 2012-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.amqp; + +import java.util.Map; + +import org.assertj.core.api.ThrowableAssert.ThrowingCallable; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.rabbit.annotation.EnableRabbit; +import org.springframework.amqp.rabbit.annotation.Exchange; +import org.springframework.amqp.rabbit.annotation.MultiRabbitListenerAnnotationBeanPostProcessor; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.QueueBinding; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.connection.RoutingConnectionFactory; +import org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; +import org.springframework.beans.factory.BeanCreationException; +import org.springframework.beans.factory.NoSuchBeanDefinitionException; +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.stereotype.Component; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Tests for {@link MultiRabbitAutoConfiguration}. + * + * @author Wander Costa + */ +class MultiRabbitAutoConfigurationTest { + + private final ApplicationContextRunner contextRunner = new ApplicationContextRunner().withConfiguration( + AutoConfigurations.of(RabbitAutoConfiguration.class, MultiRabbitAutoConfiguration.class)); + + @Test + @DisplayName("should not find MultiRabbit beans when it's disabled") + void testMultiRabbitNotLoadedWhenDisabled() { + this.contextRunner.withPropertyValues("spring.multirabbitmq.enabled", "false") + .withPropertyValues("spring.multirabbitmq.connections.broker1.port", "5673") + .withPropertyValues("spring.multirabbitmq.connections.broker2.port", "5674").run((context) -> { + final RabbitAdmin amqpAdmin = context.getBean(RabbitAdmin.class); + assertThat(amqpAdmin).isNotNull(); + + final ConnectionFactory connectionFactory = context.getBean(ConnectionFactory.class); + assertThat(connectionFactory).isNotInstanceOf(RoutingConnectionFactory.class); + + final RabbitListenerContainerFactory containerFactory = context + .getBean(RabbitListenerContainerFactory.class); + assertThat(containerFactory).isNotNull(); + + final Object annotationBPP = context + .getBean(RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME); + assertThat(annotationBPP).isNotInstanceOf(MultiRabbitListenerAnnotationBeanPostProcessor.class); + + final ThrowingCallable callable = () -> context + .getBean(MultiRabbitAutoConfiguration.MultiRabbitConnectionFactoryCreator.class); + assertThatThrownBy(callable).isInstanceOf(NoSuchBeanDefinitionException.class); + }); + } + + @Test + @DisplayName("should ensure MultiRabbit beans") + void testMultiRabbitBeans() { + final String broker1 = "broker1"; + final String broker2 = "broker2"; + this.contextRunner.withPropertyValues("spring.multirabbitmq.enabled", "true") + .withPropertyValues("spring.multirabbitmq.connections." + broker1 + ".port", "5673") + .withPropertyValues("spring.multirabbitmq.connections." + broker2 + ".port", "5674").run((context) -> { + final RabbitAdmin amqpAdmin = context.getBean(RabbitListenerConfigUtils.RABBIT_ADMIN_BEAN_NAME, + RabbitAdmin.class); + final RabbitAdmin admin1 = context.getBean( + broker1.concat(RabbitListenerConfigUtils.MULTI_RABBIT_ADMIN_SUFFIX), RabbitAdmin.class); + final RabbitAdmin admin2 = context.getBean( + broker2.concat(RabbitListenerConfigUtils.MULTI_RABBIT_ADMIN_SUFFIX), RabbitAdmin.class); + assertThat(amqpAdmin).isNotNull(); + assertThat(admin1).isNotNull(); + assertThat(admin2).isNotNull(); + + final RabbitListenerContainerFactory containerFactory = context.getBean( + RabbitListenerConfigUtils.MULTI_RABBIT_CONTAINER_FACTORY_BEAN_NAME, + RabbitListenerContainerFactory.class); + final RabbitListenerContainerFactory containerFactory1 = context.getBean(broker1, + RabbitListenerContainerFactory.class); + final RabbitListenerContainerFactory containerFactory2 = context.getBean(broker2, + RabbitListenerContainerFactory.class); + assertThat(containerFactory).isNotNull(); + assertThat(containerFactory1).isNotNull(); + assertThat(containerFactory2).isNotNull(); + + final ConnectionFactory connectionFactory = context.getBean(ConnectionFactory.class); + assertThat(connectionFactory).isInstanceOf(RoutingConnectionFactory.class); + final SimpleRoutingConnectionFactory routingConnectionFactory = (SimpleRoutingConnectionFactory) connectionFactory; + final ConnectionFactory connectionFactory1 = routingConnectionFactory + .getTargetConnectionFactory(broker1); + final ConnectionFactory connectionFactory2 = routingConnectionFactory + .getTargetConnectionFactory(broker2); + assertThat(routingConnectionFactory.getPort()).isEqualTo(5672); + assertThat(connectionFactory1).isNotNull(); + assertThat(connectionFactory1.getPort()).isEqualTo(5673); + assertThat(connectionFactory2).isNotNull(); + assertThat(connectionFactory2.getPort()).isEqualTo(5674); + + final Object annotationBPP = context + .getBean(RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME); + assertThat(annotationBPP).isInstanceOf(MultiRabbitListenerAnnotationBeanPostProcessor.class); + }); + } + + @Test + @DisplayName("should initialize declarables") + void shouldInitializeDeclarables() { + this.contextRunner.withUserConfiguration(ThreeListenersBeans.class) + .withPropertyValues("spring.multirabbitmq.enabled", "true") + .withPropertyValues("spring.multirabbitmq.connections." + ThreeListenersBeans.BROKER_NAME_1 + ".port", + "5673") + .withPropertyValues("spring.multirabbitmq.connections." + ThreeListenersBeans.BROKER_NAME_2 + ".port", + "5674") + .run((context) -> { + final Map queues = context + .getBeansOfType(org.springframework.amqp.core.Queue.class); + assertThat(queues).hasSize(3); + + final Map bindings = context.getBeansOfType(Binding.class); + assertThat(bindings).hasSize(3); + + final Map exchanges = context.getBeansOfType(DirectExchange.class); + assertThat(exchanges).hasSize(3); + }); + } + + @Test + @DisplayName("should fail to initialize listeners when MultiRabbit is disabled") + void shouldFailToInitializeListenersWhenDisabled() { + final ThrowingCallable callable = () -> new AnnotationConfigApplicationContext(ThreeListenersBeans.class, + RabbitAutoConfiguration.class, MultiRabbitAutoConfiguration.class); + assertThatThrownBy(callable).isInstanceOf(BeanCreationException.class); + } + + @Component + @EnableRabbit + private static class ThreeListenersBeans { + + public static final String EXCHANGE_0 = "exchange0"; + + public static final String ROUTING_KEY_0 = "routingKey0"; + + public static final String QUEUE_0 = "queue0"; + + public static final String BROKER_NAME_1 = "broker1"; + + public static final String EXCHANGE_1 = "exchange1"; + + public static final String ROUTING_KEY_1 = "routingKey1"; + + public static final String QUEUE_1 = "queue1"; + + public static final String BROKER_NAME_2 = "broker2"; + + public static final String EXCHANGE_2 = "exchange2"; + + public static final String ROUTING_KEY_2 = "routingKey2"; + + public static final String QUEUE_2 = "queue2"; + + @RabbitListener(bindings = @QueueBinding(exchange = @Exchange(EXCHANGE_0), value = @Queue(QUEUE_0), + key = ROUTING_KEY_0)) + void listenBroker0(final String message) { + } + + @RabbitListener(containerFactory = BROKER_NAME_1, bindings = @QueueBinding(exchange = @Exchange(EXCHANGE_1), + value = @Queue(QUEUE_1), key = ROUTING_KEY_1)) + void listenBroker1(final String message) { + } + + @RabbitListener(containerFactory = BROKER_NAME_2, bindings = @QueueBinding(exchange = @Exchange(EXCHANGE_2), + value = @Queue(QUEUE_2), key = ROUTING_KEY_2)) + void listenBroker2(final String message) { + } + + } + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/MultiRabbitConnectionFactoryWrapperTest.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/MultiRabbitConnectionFactoryWrapperTest.java new file mode 100644 index 000000000000..fd8d651f3696 --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/MultiRabbitConnectionFactoryWrapperTest.java @@ -0,0 +1,81 @@ +/* + * Copyright 2012-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.amqp; + +import org.junit.Assert; +import org.junit.function.ThrowingRunnable; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; + +/** + * Tests for {@link MultiRabbitConnectionFactoryWrapper}. + * + * @author Wander Costa + */ +@ExtendWith(MockitoExtension.class) +class MultiRabbitConnectionFactoryWrapperTest { + + private static final String DUMMY_KEY = "dummy-key"; + + @Mock + private ConnectionFactory connectionFactory; + + @Mock + private SimpleRabbitListenerContainerFactory containerFactory; + + @Mock + private RabbitAdmin rabbitAdmin; + + private final MultiRabbitConnectionFactoryWrapper wrapper = new MultiRabbitConnectionFactoryWrapper(); + + @Test + void shouldGetDefaultConnectionFactory() { + this.wrapper.setDefaultConnectionFactory(this.connectionFactory); + Assert.assertSame(this.connectionFactory, this.wrapper.getDefaultConnectionFactory()); + } + + @Test + void shouldSetNullDefaultConnectionFactory() { + this.wrapper.setDefaultConnectionFactory(null); + Assert.assertNull(this.wrapper.getDefaultConnectionFactory()); + } + + @Test + void shouldAddConnectionFactory() { + this.wrapper.addConnectionFactory(DUMMY_KEY, this.connectionFactory); + Assert.assertSame(this.connectionFactory, this.wrapper.getConnectionFactories().get(DUMMY_KEY)); + } + + @Test + void shouldNotAddNullConnectionFactory() { + final ThrowingRunnable runnable = () -> this.wrapper.addConnectionFactory(DUMMY_KEY, null); + Assert.assertThrows("ConnectionFactory may not be null", IllegalArgumentException.class, runnable); + } + + @Test + void shouldNotAddConnectionFactoryWithEmptyKey() { + final ThrowingRunnable runnable = () -> this.wrapper.addConnectionFactory("", this.connectionFactory); + Assert.assertThrows("Key may not be null or empty", IllegalArgumentException.class, runnable); + } + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java index 4e1e23dfff0f..c38aa7135bab 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java @@ -89,7 +89,7 @@ */ class RabbitAutoConfigurationTests { - private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() + protected ApplicationContextRunner contextRunner = new ApplicationContextRunner() .withConfiguration(AutoConfigurations.of(RabbitAutoConfiguration.class)); @Test diff --git a/spring-boot-project/spring-boot-docs/src/docs/asciidoc/spring-boot-features.adoc b/spring-boot-project/spring-boot-docs/src/docs/asciidoc/spring-boot-features.adoc index eae98d701bf5..96e1476314e2 100644 --- a/spring-boot-project/spring-boot-docs/src/docs/asciidoc/spring-boot-features.adoc +++ b/spring-boot-project/spring-boot-docs/src/docs/asciidoc/spring-boot-features.adoc @@ -5497,6 +5497,38 @@ See {spring-boot-autoconfigure-module-code}/amqp/RabbitProperties.java[`RabbitPr TIP: See https://spring.io/blog/2010/06/14/understanding-amqp-the-protocol-used-by-rabbitmq/[Understanding AMQP, the protocol used by RabbitMQ] for more details. +===== Multiple RabbitMQ +Additional RabbitMQ brokers can be configured through the properties under `+spring.multirabbitmq+`. +All properties available under `+spring.rabbitmq+` are also available under `spring.multirabbitmq.connections.[]`. +There is no strict limitation on the names assigned to the brokers or the number of different brokers that can be +configured. + +For example, in the configuration below, there is one default broker provided at `+spring.rabbitmq+` and +two additional brokers, provided under `spring.multirabbitmq.connections.brokerA` and +`spring.multirabbitmq.connections.brokerB`: + +[source,yaml,indent=0,configprops,configblocks] +---- + spring: + rabbitmq: + host: "192.168.0.10" + port: 5672 + username: "user" + password: "pass" + multirabbitmq: + enabled: true + connections: + brokerA: + host: "10.10.10.10" + port: 5672 + username: "admin" + password: "firstSecret" + brokerB: + host: "210.10.0.190" + port: 5672 + username: "admin" + password: "secondSecret" +---- [[boot-features-using-amqp-sending]] @@ -5549,6 +5581,59 @@ You can also customize the `RetryTemplate` programmatically by declaring a `Rabb If you need to create more `RabbitTemplate` instances or if you want to override the default, Spring Boot provides a `RabbitTemplateConfigurer` bean that you can use to initialize a `RabbitTemplate` with the same settings as the factories used by the auto-configuration. +===== MultiRabbitMQ +Once enabled, MultiRabbitMQ will instantiate a `RoutingConnectionFactory` backed by multiple `ConnectionFactory` s. + +In the example below, there is the default and two additional brokers: `brokerA` and `brokerB`. + +[source,yaml,indent=0,configprops,configblocks] +---- + spring: + rabbitmq: + # ... + multirabbitmq: + enabled: true + connections: + brokerA: + # ... + brokerB: + # ... +---- + +To send messages, it's necessary to bind to the proper `ConnectionFactory` by using `SimpleResourceHolder` or by using +`ConnectionFactoryContextWrapper`. + +[source,java,indent=0] +[subs="verbatim,quotes"] +---- + @Component + public class MyPublisher { + + @Autowired + private RabbitTemplate template; + + @Autowired + private ConnectionFactoryContextWrapper contextWrapper; + + void sendMessageToDefaultBroker(String message) { + rabbitTemplate.convertAndSend("exc", "rk", message); + } + + void sendMessageToBrokerA(String message) { + SimpleResourceHolder.bind(template.getConnectionFactory(), "brokerA"); + try { + rabbitTemplate.convertAndSend("exc", "rk", message); + } finally { + SimpleResourceHolder.unbind(template.getConnectionFactory()); + } + } + + void sendMessageToBrokerB(String message) { + contextWrapper.run("brokerB", () -> rabbitTemplate.convertAndSend("exc", "rk", message)); + } + + } +---- [[boot-features-using-amqp-receiving]] @@ -5625,6 +5710,53 @@ IMPORTANT: By default, if retries are disabled and the listener throws an except You can modify this behavior in two ways: Set the `defaultRequeueRejected` property to `false` so that zero re-deliveries are attempted or throw an `AmqpRejectAndDontRequeueException` to signal the message should be rejected. The latter is the mechanism used when retries are enabled and the maximum number of delivery attempts is reached. +===== MultiRabbitMQ +Once enabled, MultiRabbitMQ will instantiate multiple `RabbitListenerContainerFactory` s, `AmqpAdmin` s and +`ConnectionFactory` s and create an easy way to link listeners to the different brokers provided. + +In the example below, there is the default and two additional brokers: `brokerA` and `brokerB`. + +[source,yaml,indent=0,configprops,configblocks] +---- + spring: + rabbitmq: + # ... + multirabbitmq: + enabled: true + connections: + brokerA: + # ... + brokerB: + # ... +---- + +To link a `@RabbitListener` to one of the additional brokers, the listener must reference the proper `containerFactory`, +which follows the same name as the connection entry. If no `containerFactory` is provided, the listener is linked to the +default broker. + +[source,java,indent=0] +[subs="verbatim,quotes"] +---- + @Component + public class MyListeners { + + @RabbitListener(queues = "someQueue") + public void processMessageFromDefaultBroker(String content) { + // ... + } + + @RabbitListener(queues = "otherQueue", **containerFactory="brokerA"**) + public void processMessageFromBrokerA(String content) { + // ... + } + + @RabbitListener(queues = "anotherQueue", **containerFactory="brokerB"**) + public void processMessageFromBrokerB(String content) { + // ... + } + + } +---- [[boot-features-kafka]]