Skip to content

Introducing MultiRabbit Bootstrap #1303

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.rabbit.annotation;

import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.context.EnvironmentAware;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotationMetadata;
import org.springframework.lang.Nullable;

/**
* An {@link ImportBeanDefinitionRegistrar} class that registers
* a {@link MultiRabbitListenerAnnotationBeanPostProcessor} bean, if MultiRabbit
* is enabled.
*
* @author Wander Costa
*
* @since 1.4
*
* @see RabbitListenerAnnotationBeanPostProcessor
* @see MultiRabbitListenerAnnotationBeanPostProcessor
* @see RabbitListenerEndpointRegistry
* @see EnableRabbit
*/
public class MultiRabbitBootstrapConfiguration implements ImportBeanDefinitionRegistrar, EnvironmentAware {

private Environment environment;

@Override
public void registerBeanDefinitions(@Nullable AnnotationMetadata importingClassMetadata,
BeanDefinitionRegistry registry) {

if (isMultiRabbitEnabled() && !registry.containsBeanDefinition(
RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {

registry.registerBeanDefinition(RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
new RootBeanDefinition(MultiRabbitListenerAnnotationBeanPostProcessor.class));
}
}

private boolean isMultiRabbitEnabled() {
final String isMultiEnabledStr = this.environment.getProperty(
RabbitListenerConfigUtils.MULTI_RABBIT_ENABLED_PROPERTY);
return Boolean.parseBoolean(isMultiEnabledStr);
}

@Override
public void setEnvironment(final Environment environment) {
this.environment = environment;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 the original author or authors.
* Copyright 2020-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,16 +16,20 @@

package org.springframework.amqp.rabbit.annotation;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Collection;

import org.springframework.amqp.core.Declarable;
import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils;
import org.springframework.util.StringUtils;

/**
* An extension of {@link RabbitListenerAnnotationBeanPostProcessor} that associates the
* proper RabbitAdmin to the beans of Exchanges, Queues, and Bindings after they are
* created.
* An extension of {@link RabbitListenerAnnotationBeanPostProcessor} that indicates the proper
* RabbitAdmin bean to be used when processing to the listeners, and also associates it to the
* declarables (Exchanges, Queues, and Bindings) returned.
* <p>
* This processing restricts the {@link org.springframework.amqp.rabbit.core.RabbitAdmin} according to the related
* configuration, preventing the server from automatic binding non-related structures.
Expand All @@ -36,19 +40,12 @@
*/
public class MultiRabbitListenerAnnotationBeanPostProcessor extends RabbitListenerAnnotationBeanPostProcessor {

public static final String CONNECTION_FACTORY_BEAN_NAME = "multiRabbitConnectionFactory";

public static final String CONNECTION_FACTORY_CREATOR_BEAN_NAME = "rabbitConnectionFactoryCreator";

private static final String DEFAULT_RABBIT_ADMIN_BEAN_NAME = "defaultRabbitAdmin";

private static final String RABBIT_ADMIN_SUFFIX = "-admin";

@Override
protected Collection<Declarable> processAmqpListener(RabbitListener rabbitListener, Method method,
Object bean, String beanName) {
final Collection<Declarable> declarables = super.processAmqpListener(rabbitListener, method, bean, beanName);
final String rabbitAdmin = resolveMultiRabbitAdminName(rabbitListener);
final RabbitListener rabbitListenerRef = proxyIfAdminNotPresent(rabbitListener, rabbitAdmin);
final Collection<Declarable> declarables = super.processAmqpListener(rabbitListenerRef, method, bean, beanName);
for (final Declarable declarable : declarables) {
if (declarable.getDeclaringAdmins().isEmpty()) {
declarable.setAdminsThatShouldDeclare(rabbitAdmin);
Expand All @@ -57,6 +54,15 @@ protected Collection<Declarable> processAmqpListener(RabbitListener rabbitListen
return declarables;
}

private RabbitListener proxyIfAdminNotPresent(final RabbitListener rabbitListener, final String rabbitAdmin) {
if (StringUtils.hasText(rabbitListener.admin())) {
return rabbitListener;
}
return (RabbitListener) Proxy.newProxyInstance(
RabbitListener.class.getClassLoader(), new Class<?>[]{RabbitListener.class},
new RabbitListenerAdminReplacementInvocationHandler(rabbitListener, rabbitAdmin));
}

/**
* Resolves the name of the RabbitAdmin bean based on the RabbitListener, or falls back to
* the default RabbitAdmin name provided by MultiRabbit.
Expand All @@ -66,13 +72,35 @@ protected Collection<Declarable> processAmqpListener(RabbitListener rabbitListen
protected String resolveMultiRabbitAdminName(RabbitListener rabbitListener) {
String admin = super.resolveExpressionAsString(rabbitListener.admin(), "admin");
if (!StringUtils.hasText(admin) && StringUtils.hasText(rabbitListener.containerFactory())) {
admin = rabbitListener.containerFactory()
+ MultiRabbitListenerAnnotationBeanPostProcessor.RABBIT_ADMIN_SUFFIX;
admin = rabbitListener.containerFactory() + RabbitListenerConfigUtils.MULTI_RABBIT_ADMIN_SUFFIX;
}
if (!StringUtils.hasText(admin)) {
admin = MultiRabbitListenerAnnotationBeanPostProcessor.DEFAULT_RABBIT_ADMIN_BEAN_NAME;
admin = RabbitListenerConfigUtils.RABBIT_ADMIN_BEAN_NAME;
}
return admin;
}

/**
* An {@link InvocationHandler} to provide a replacing admin() parameter of the listener.
*/
private final class RabbitListenerAdminReplacementInvocationHandler implements InvocationHandler {

private final RabbitListener target;
private final String admin;

private RabbitListenerAdminReplacementInvocationHandler(final RabbitListener target, final String admin) {
this.target = target;
this.admin = admin;
}

@Override
public Object invoke(final Object proxy, final Method method, final Object[] args)
throws InvocationTargetException, IllegalAccessException {
if (method.getName().equals("admin")) {
return this.admin;
}
return method.invoke(this.target, args);
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 the original author or authors.
* Copyright 2019-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,7 +22,10 @@

/**
* A {@link DeferredImportSelector} implementation with the lowest order to import a
* {@link RabbitBootstrapConfiguration} as late as possible.
* {@link MultiRabbitBootstrapConfiguration} and {@link RabbitBootstrapConfiguration}
* as late as possible.
* {@link MultiRabbitBootstrapConfiguration} has precedence to be able to provide the
* extended BeanPostProcessor, if enabled.
*
* @author Artem Bilan
*
Expand All @@ -33,7 +36,8 @@ public class RabbitListenerConfigurationSelector implements DeferredImportSelect

@Override
public String[] selectImports(AnnotationMetadata importingClassMetadata) {
return new String[] { RabbitBootstrapConfiguration.class.getName() };
return new String[] { MultiRabbitBootstrapConfiguration.class.getName(),
RabbitBootstrapConfiguration.class.getName()};
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,4 +36,29 @@ public abstract class RabbitListenerConfigUtils {
public static final String RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME =
"org.springframework.amqp.rabbit.config.internalRabbitListenerEndpointRegistry";

/**
* The bean name of the default RabbitAdmin.
*/
public static final String RABBIT_ADMIN_BEAN_NAME = "amqpAdmin";

/**
* The bean name of the default ConnectionFactory.
*/
public static final String RABBIT_CONNECTION_FACTORY_BEAN_NAME = "rabbitConnectionFactory";

/**
* The default property to enable/disable MultiRabbit processing.
*/
public static final String MULTI_RABBIT_ENABLED_PROPERTY = "spring.multirabbitmq.enabled";

/**
* The bean name of the ContainerFactory of the default broker for MultiRabbit.
*/
public static final String MULTI_RABBIT_CONTAINER_FACTORY_BEAN_NAME = "multiRabbitContainerFactory";

/**
* The MultiRabbit admins' suffix.
*/
public static final String MULTI_RABBIT_ADMIN_SUFFIX = "-admin";

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 the original author or authors.
* Copyright 2020-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@

package org.springframework.amqp.rabbit.annotation;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiFunction;
Expand All @@ -31,13 +32,15 @@
import org.springframework.amqp.core.Declarable;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.config.MessageListenerTestContainer;
import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils;
import org.springframework.amqp.rabbit.config.RabbitListenerContainerTestFactory;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.SimpleResourceHolder;
import org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.MethodRabbitListenerEndpoint;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpoint;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
Expand Down Expand Up @@ -179,6 +182,28 @@ void testCreationOfConnections() {
context.close(); // Close and stop the listeners
}

@Test
@DisplayName("Test assignment of RabbitAdmin in the endpoint registry")
void testAssignmentOfRabbitAdminInTheEndpointRegistry() {
ConfigurableApplicationContext context = new AnnotationConfigApplicationContext(MultiConfig.class,
AutoBindingListenerTestBeans.class);

final RabbitListenerEndpointRegistry registry = context.getBean(RabbitListenerEndpointRegistry.class);
final Collection<MessageListenerContainer> listenerContainers = registry.getListenerContainers();

Assertions.assertThat(listenerContainers).hasSize(3);
listenerContainers.forEach(container -> {
Assertions.assertThat(container).isInstanceOf(MessageListenerTestContainer.class);
final MessageListenerTestContainer refContainer = (MessageListenerTestContainer) container;
final RabbitListenerEndpoint endpoint = refContainer.getEndpoint();
Assertions.assertThat(endpoint).isInstanceOf(MethodRabbitListenerEndpoint.class);
final MethodRabbitListenerEndpoint refEndpoint = (MethodRabbitListenerEndpoint) endpoint;
Assertions.assertThat(refEndpoint.getAdmin()).isNotNull();
});

context.close(); // Close and stop the listeners
}

@Component
static class AutoBindingListenerTestBeans {

Expand Down Expand Up @@ -266,7 +291,7 @@ public RabbitListenerAnnotationBeanPostProcessor postProcessor() {
return postProcessor;
}

@Bean("defaultRabbitAdmin")
@Bean(RabbitListenerConfigUtils.RABBIT_ADMIN_BEAN_NAME)
public RabbitAdmin defaultRabbitAdmin() {
return DEFAULT_RABBIT_ADMIN;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2014-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.rabbit.annotation;

import static org.assertj.core.api.Assertions.assertThat;

import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.core.env.Environment;

class MultiRabbitBootstrapConfigurationTest {

@Test
@DisplayName("test if MultiRabbitBPP is registered when enabled")
void testMultiRabbitBPPIsRegistered() throws Exception {
final Environment environment = Mockito.mock(Environment.class);
final ArgumentCaptor<RootBeanDefinition> captor = ArgumentCaptor.forClass(RootBeanDefinition.class);
final BeanDefinitionRegistry registry = Mockito.mock(BeanDefinitionRegistry.class);
final MultiRabbitBootstrapConfiguration bootstrapConfiguration = new MultiRabbitBootstrapConfiguration();
bootstrapConfiguration.setEnvironment(environment);

Mockito.when(environment.getProperty(RabbitListenerConfigUtils.MULTI_RABBIT_ENABLED_PROPERTY))
.thenReturn("true");

bootstrapConfiguration.registerBeanDefinitions(null, registry);

Mockito.verify(registry).registerBeanDefinition(
Mockito.eq(RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME),
captor.capture());

assertThat(captor.getValue().getBeanClass()).isEqualTo(MultiRabbitListenerAnnotationBeanPostProcessor.class);
}

@Test
@DisplayName("test if MultiRabbitBPP is not registered when disabled")
void testMultiRabbitBPPIsNotRegistered() throws Exception {
final Environment environment = Mockito.mock(Environment.class);
final BeanDefinitionRegistry registry = Mockito.mock(BeanDefinitionRegistry.class);
final MultiRabbitBootstrapConfiguration bootstrapConfiguration = new MultiRabbitBootstrapConfiguration();
bootstrapConfiguration.setEnvironment(environment);

Mockito.when(environment.getProperty(RabbitListenerConfigUtils.MULTI_RABBIT_ENABLED_PROPERTY))
.thenReturn("false");

bootstrapConfiguration.registerBeanDefinitions(null, registry);

Mockito.verify(registry, Mockito.never()).registerBeanDefinition(Mockito.anyString(),
Mockito.any(RootBeanDefinition.class));
}
}
Loading