diff --git a/eventhubs/spring-cloud-azure-starter-integration-eventhubs/eventhubs-integration/src/main/java/com/azure/spring/sample/eventhubs/SendController.java b/eventhubs/spring-cloud-azure-starter-integration-eventhubs/eventhubs-integration/src/main/java/com/azure/spring/sample/eventhubs/SendController.java index 80df22b38..a69575b25 100644 --- a/eventhubs/spring-cloud-azure-starter-integration-eventhubs/eventhubs-integration/src/main/java/com/azure/spring/sample/eventhubs/SendController.java +++ b/eventhubs/spring-cloud-azure-starter-integration-eventhubs/eventhubs-integration/src/main/java/com/azure/spring/sample/eventhubs/SendController.java @@ -47,8 +47,8 @@ public String send(@RequestParam("message") String message) { */ @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) - public MessageHandler messageSender(EventHubsTemplate queueOperation) { - DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, queueOperation); + public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) { + DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate); handler.setSendCallback(new ListenableFutureCallback() { @Override public void onSuccess(Void result) { diff --git a/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/README.md b/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/README.md index e6b4287ee..9a98480df 100644 --- a/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/README.md +++ b/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/README.md @@ -1,11 +1,12 @@ -# Using Spring Integration for Azure Service Bus With Multiple Destinations +# Using Spring Integration to Interact with Multiple Azure Service Bus Namespaces -This code sample demonstrates how to use Spring Integration for Azure Service Bus with multiple destinations. +This code sample demonstrates how to use Spring Integration to interact with multiple Azure Service Bus namespaces. ## What You Will Build - -You will build an application that using Spring Integration for Azure Service Bus to send and receive messages from one queue in one Service Bus namespace and then forward them to another queue in another Service Bus namespace. +You will build an application that using Spring Integration for Azure Service Bus to send and receive messages with two Service Bus namespaces. +This sample will send messages to two queues in two different Azure Service Bus namespaces and then receive the messages. +In addition, this sample will forward all messages received from the first queue to the second queue. ## What You Need @@ -154,12 +155,11 @@ You can debug your sample by adding the saved output values to the tool's enviro Verify in your app’s logs that similar messages were posted: ```shell -Message was sent successfully for queue1. +Receive messages from the first queue: 1 +Receive messages from the second queue: 2 ... -New message received: 'hello' -Message 'hello' successfully checkpointed +Receive messages from the second queue: transformed from queue1, 1 ... -Message was sent successfully for queue2. ``` diff --git a/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/src/main/java/com/azure/spring/sample/servicebus/CustomizedServiceBusProperties.java b/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/src/main/java/com/azure/spring/sample/servicebus/CustomizedServiceBusProperties.java index 60dc3a994..1f358d3e2 100644 --- a/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/src/main/java/com/azure/spring/sample/servicebus/CustomizedServiceBusProperties.java +++ b/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/src/main/java/com/azure/spring/sample/servicebus/CustomizedServiceBusProperties.java @@ -3,52 +3,21 @@ package com.azure.spring.sample.servicebus; -import com.azure.spring.messaging.ConsumerIdentifier; -import com.azure.spring.messaging.PropertiesSupplier; -import com.azure.spring.messaging.servicebus.core.properties.CommonProperties; -import com.azure.spring.messaging.servicebus.core.properties.ProcessorProperties; -import com.azure.spring.messaging.servicebus.core.properties.ProducerProperties; +import com.azure.spring.messaging.servicebus.core.properties.NamespaceProperties; import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.boot.context.properties.NestedConfigurationProperty; import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; /** * Property class for Service Bus multiple namespaces sample. */ -@ConfigurationProperties("servicebus") +@ConfigurationProperties("my.servicebus") public class CustomizedServiceBusProperties { - @NestedConfigurationProperty - private final List producers = new ArrayList<>(); + private final List namespaces = new ArrayList<>(); - @NestedConfigurationProperty - private final List processors = new ArrayList<>(); - - public PropertiesSupplier producerPropertiesSupplier() { - Map map = producers - .stream() - .collect(Collectors.toMap(CommonProperties::getEntityName, p -> p)); - return map::get; - } - - public PropertiesSupplier processorPropertiesSupplier() { - Map map = processors - .stream() - .collect(Collectors.toMap( - p -> p.getSubscriptionName() == null ? new ConsumerIdentifier(p.getEntityName()) : new ConsumerIdentifier(p.getEntityName(), p.getSubscriptionName()), p -> p)); - return map::get; - } - - public List getProducers() { - return producers; - } - - public List getProcessors() { - return processors; + public List getNamespaces() { + return namespaces; } - } diff --git a/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/src/main/java/com/azure/spring/sample/servicebus/IntegrationFlowConfiguration.java b/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/src/main/java/com/azure/spring/sample/servicebus/IntegrationFlowConfiguration.java new file mode 100644 index 000000000..2749e3f92 --- /dev/null +++ b/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/src/main/java/com/azure/spring/sample/servicebus/IntegrationFlowConfiguration.java @@ -0,0 +1,106 @@ +package com.azure.spring.sample.servicebus; + +import com.azure.spring.integration.core.handler.DefaultMessageHandler; +import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter; +import com.azure.spring.messaging.servicebus.core.ServiceBusTemplate; +import com.azure.spring.messaging.servicebus.core.listener.ServiceBusMessageListenerContainer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.dsl.IntegrationFlows; +import org.springframework.integration.dsl.Pollers; +import org.springframework.messaging.MessageHandler; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; + +@Configuration +public class IntegrationFlowConfiguration { + + @Value("${my.servicebus.namespaces[0].entity-name:}") + private String firstQueueName; + + @Value("${my.servicebus.namespaces[1].entity-name:}") + private String secondQueueName; + + private static final Logger LOGGER = LoggerFactory.getLogger(IntegrationFlowConfiguration.class); + + + private final ServiceBusTemplate firstServiceBusTemplate; + private final ServiceBusTemplate secondServiceBusTemplate; + private final ServiceBusMessageListenerContainer firstMessageListenerContainer; + private final ServiceBusMessageListenerContainer secondMessageListenerContainer; + + public IntegrationFlowConfiguration(@Qualifier("firstServiceBusTemplate") ServiceBusTemplate firstServiceBusTemplate, + @Qualifier("secondServiceBusTemplate") ServiceBusTemplate secondServiceBusTemplate, + @Qualifier("firstMessageListenerContainer") ServiceBusMessageListenerContainer firstMessageListenerContainer, + @Qualifier("secondMessageListenerContainer") ServiceBusMessageListenerContainer secondMessageListenerContainer) { + this.firstServiceBusTemplate = firstServiceBusTemplate; + this.secondServiceBusTemplate = secondServiceBusTemplate; + this.firstMessageListenerContainer = firstMessageListenerContainer; + this.secondMessageListenerContainer = secondMessageListenerContainer; + } + + + @Bean + public MessageHandler firstMessageHandler() { + return new DefaultMessageHandler(firstQueueName, firstServiceBusTemplate); + } + + @Bean + public ServiceBusInboundChannelAdapter firstServiceBusInboundChannelAdapter() { + ServiceBusInboundChannelAdapter channelAdapter = new ServiceBusInboundChannelAdapter(firstMessageListenerContainer); + channelAdapter.setPayloadType(String.class); + return channelAdapter; + } + + @Bean + public MessageHandler secondMessageHandler() { + return new DefaultMessageHandler(secondQueueName, secondServiceBusTemplate); + } + + @Bean + public ServiceBusInboundChannelAdapter secondServiceBusInboundChannelAdapter() { + ServiceBusInboundChannelAdapter channelAdapter = new ServiceBusInboundChannelAdapter(secondMessageListenerContainer); + channelAdapter.setPayloadType(String.class); + return channelAdapter; + } + + @Bean + public AtomicInteger integerSource() { + return new AtomicInteger(); + } + + @Bean + public IntegrationFlow sendFlow() { + return IntegrationFlows.fromSupplier(integerSource()::getAndIncrement, + c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(10)))) + .route(p -> p % 2 == 0, m + -> m.subFlowMapping(true, f -> f.handle(firstMessageHandler())) + .subFlowMapping(false, f -> f.handle(secondMessageHandler()))) + .get(); + } + + @Bean + public IntegrationFlow transformFlow() { + return IntegrationFlows.from(firstServiceBusInboundChannelAdapter()) + .transform(m -> { + LOGGER.info("Receive messages from the first queue: {}", m); + return "transformed from queue1, " + m; + }) + .handle(secondMessageHandler()) + .get(); + } + + @Bean + public IntegrationFlow secondListenerFlow() { + return IntegrationFlows.from(secondServiceBusInboundChannelAdapter()) + .handle(m -> LOGGER.info("Receive messages from the second queue: {}", m.getPayload())) + .get(); + } + +} diff --git a/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/src/main/java/com/azure/spring/sample/servicebus/MultipleAzureServiceBusNamespacesConfiguration.java b/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/src/main/java/com/azure/spring/sample/servicebus/MultipleAzureServiceBusNamespacesConfiguration.java new file mode 100644 index 000000000..82c47b91e --- /dev/null +++ b/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/src/main/java/com/azure/spring/sample/servicebus/MultipleAzureServiceBusNamespacesConfiguration.java @@ -0,0 +1,57 @@ +package com.azure.spring.sample.servicebus; + +import com.azure.spring.messaging.servicebus.core.DefaultServiceBusNamespaceProcessorFactory; +import com.azure.spring.messaging.servicebus.core.DefaultServiceBusNamespaceProducerFactory; +import com.azure.spring.messaging.servicebus.core.ServiceBusProcessorFactory; +import com.azure.spring.messaging.servicebus.core.ServiceBusProducerFactory; +import com.azure.spring.messaging.servicebus.core.ServiceBusTemplate; +import com.azure.spring.messaging.servicebus.core.listener.ServiceBusMessageListenerContainer; +import com.azure.spring.messaging.servicebus.core.properties.NamespaceProperties; +import com.azure.spring.messaging.servicebus.core.properties.ServiceBusContainerProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +@EnableConfigurationProperties(CustomizedServiceBusProperties.class) +public class MultipleAzureServiceBusNamespacesConfiguration { + + private final NamespaceProperties firstNamespaceProperties; + private final NamespaceProperties secondNamespaceProperties; + + public MultipleAzureServiceBusNamespacesConfiguration(CustomizedServiceBusProperties properties) { + this.firstNamespaceProperties = properties.getNamespaces().get(0); + this.secondNamespaceProperties = properties.getNamespaces().get(1); + } + + @Bean + public ServiceBusTemplate firstServiceBusTemplate() { + ServiceBusProducerFactory producerFactory = new DefaultServiceBusNamespaceProducerFactory(firstNamespaceProperties); + return new ServiceBusTemplate(producerFactory); + } + + @Bean + public ServiceBusMessageListenerContainer firstMessageListenerContainer() { + ServiceBusProcessorFactory processorFactory = new DefaultServiceBusNamespaceProcessorFactory(firstNamespaceProperties); + ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties(); + containerProperties.setEntityName(firstNamespaceProperties.getEntityName()); + containerProperties.setPrefetchCount(10); + return new ServiceBusMessageListenerContainer(processorFactory, containerProperties); + } + + @Bean + public ServiceBusTemplate secondServiceBusTemplate() { + ServiceBusProducerFactory producerFactory = new DefaultServiceBusNamespaceProducerFactory(secondNamespaceProperties); + return new ServiceBusTemplate(producerFactory); + } + + @Bean + public ServiceBusMessageListenerContainer secondMessageListenerContainer() { + ServiceBusProcessorFactory processorFactory = new DefaultServiceBusNamespaceProcessorFactory(secondNamespaceProperties); + ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties(); + containerProperties.setEntityName(secondNamespaceProperties.getEntityName()); + containerProperties.setPrefetchCount(10); + return new ServiceBusMessageListenerContainer(processorFactory, containerProperties); + } + +} diff --git a/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/src/main/java/com/azure/spring/sample/servicebus/MultipleNamespacesAzureServiceBusMessagingAutoConfiguration.java b/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/src/main/java/com/azure/spring/sample/servicebus/MultipleNamespacesAzureServiceBusMessagingAutoConfiguration.java deleted file mode 100644 index f6fad7104..000000000 --- a/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/src/main/java/com/azure/spring/sample/servicebus/MultipleNamespacesAzureServiceBusMessagingAutoConfiguration.java +++ /dev/null @@ -1,131 +0,0 @@ -package com.azure.spring.sample.servicebus; - -import com.azure.spring.cloud.autoconfigure.servicebus.AzureServiceBusAutoConfiguration; -import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter; -import com.azure.spring.messaging.ConsumerIdentifier; -import com.azure.spring.messaging.PropertiesSupplier; -import com.azure.spring.messaging.servicebus.core.ServiceBusProcessorFactory; -import com.azure.spring.messaging.servicebus.core.ServiceBusProducerFactory; -import com.azure.spring.messaging.servicebus.core.ServiceBusTemplate; -import com.azure.spring.messaging.servicebus.core.listener.ServiceBusMessageListenerContainer; -import com.azure.spring.messaging.servicebus.core.properties.ProcessorProperties; -import com.azure.spring.messaging.servicebus.core.properties.ProducerProperties; -import com.azure.spring.messaging.servicebus.core.properties.ServiceBusContainerProperties; -import com.azure.spring.messaging.servicebus.implementation.core.DefaultServiceBusNamespaceProcessorFactory; -import com.azure.spring.messaging.servicebus.implementation.core.DefaultServiceBusNamespaceProducerFactory; -import com.azure.spring.messaging.servicebus.support.converter.ServiceBusMessageConverter; -import org.springframework.beans.factory.ObjectProvider; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.boot.autoconfigure.AutoConfigureAfter; -import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Import; -import org.springframework.integration.channel.DirectChannel; -import org.springframework.messaging.MessageChannel; - -@Configuration(proxyBeanMethods = false) -@AutoConfigureAfter(AzureServiceBusAutoConfiguration.class) -@Import({ - MultipleNamespacesAzureServiceBusMessagingAutoConfiguration.ServiceBusTemplateConfiguration.class, - MultipleNamespacesAzureServiceBusMessagingAutoConfiguration.ProcessorContainerConfiguration.class -}) -public class MultipleNamespacesAzureServiceBusMessagingAutoConfiguration { - - private static final String RECEIVE_QUEUE_NAME = "queue1"; - private static final String INPUT_CHANNEL = "queue1.input"; - - @Autowired - private CustomizedServiceBusProperties properties; - - /** - * Configure the {@link ServiceBusProcessorFactory} - */ - @Configuration(proxyBeanMethods = false) - public static class ProcessorContainerConfiguration { - - @Bean - @ConditionalOnMissingBean - public ServiceBusProcessorFactory defaultServiceBusNamespaceProcessorFactory( - ObjectProvider> suppliers) { - return new DefaultServiceBusNamespaceProcessorFactory(null, suppliers.getIfAvailable()); - } - - } - - /** - * Configure the {@link ServiceBusTemplate} - */ - @Configuration(proxyBeanMethods = false) - public static class ServiceBusTemplateConfiguration { - - @Bean - @ConditionalOnMissingBean - public ServiceBusProducerFactory defaultServiceBusNamespaceProducerFactory( - ObjectProvider> suppliers) { - return new DefaultServiceBusNamespaceProducerFactory(null, suppliers.getIfAvailable()); - } - - @Bean - @ConditionalOnMissingBean - public ServiceBusMessageConverter messageConverter() { - return new ServiceBusMessageConverter(); - } - - @Bean - @ConditionalOnMissingBean - @ConditionalOnBean(ServiceBusProducerFactory.class) - public ServiceBusTemplate serviceBusTemplate(ServiceBusProducerFactory senderClientFactory, - ServiceBusMessageConverter messageConverter) { - ServiceBusTemplate serviceBusTemplate = new ServiceBusTemplate(senderClientFactory); - serviceBusTemplate.setMessageConverter(messageConverter); - return serviceBusTemplate; - } - } - - @Bean - public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) { - ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties(); - containerProperties.setEntityName(RECEIVE_QUEUE_NAME); - containerProperties.setAutoComplete(false); - return new ServiceBusMessageListenerContainer(processorFactory, containerProperties); - } - - /** - * {@link ServiceBusInboundChannelAdapter} binding with {@link MessageChannel} has name {@value INPUT_CHANNEL} - * - * @param inputChannel the MessageChannel binding with ServiceBusInboundChannelAdapter - * @param listenerContainer instance of ServiceBusProcessorContainer - * @return instance of ServiceBusInboundChannelAdapter - */ - @Bean - public ServiceBusInboundChannelAdapter queueMessageChannelAdapter( - @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, - ServiceBusMessageListenerContainer listenerContainer) { - ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer); - adapter.setOutputChannel(inputChannel); - return adapter; - } - - /** - * {@link MessageChannel} with name {@value INPUT_CHANNEL} - * - * @return {@link MessageChannel} - */ - @Bean(name = INPUT_CHANNEL) - public MessageChannel input() { - return new DirectChannel(); - } - - @Bean - public PropertiesSupplier producerPropertiesSupplier() { - return properties.producerPropertiesSupplier(); - } - - @Bean - public PropertiesSupplier processorPropertiesSupplier() { - return properties.processorPropertiesSupplier(); - } -} diff --git a/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/src/main/java/com/azure/spring/sample/servicebus/QueueReceiveService.java b/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/src/main/java/com/azure/spring/sample/servicebus/QueueReceiveService.java deleted file mode 100644 index 383d0a764..000000000 --- a/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/src/main/java/com/azure/spring/sample/servicebus/QueueReceiveService.java +++ /dev/null @@ -1,86 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.spring.sample.servicebus; - -import com.azure.spring.integration.core.handler.DefaultMessageHandler; -import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter; -import com.azure.spring.messaging.AzureHeaders; -import com.azure.spring.messaging.checkpoint.Checkpointer; -import com.azure.spring.cloud.service.servicebus.properties.ServiceBusEntityType; -import com.azure.spring.messaging.servicebus.core.ServiceBusTemplate; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Bean; -import org.springframework.integration.annotation.MessagingGateway; -import org.springframework.integration.annotation.ServiceActivator; -import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.MessageHandler; -import org.springframework.messaging.handler.annotation.Header; -import org.springframework.stereotype.Service; -import org.springframework.util.concurrent.ListenableFutureCallback; - -@Service -public class QueueReceiveService { - - private static final Logger LOGGER = LoggerFactory.getLogger(QueueReceiveService.class); - private static final String INPUT_CHANNEL = "queue1.input"; - private static final String OUTPUT_CHANNEL_QUEUE2 = "queue2.output"; - private static final String FORWARD_QUEUE_NAME = "queue2"; - - @Autowired - private QueueForwardGateway messagingGateway; - - /** - * This message receiver binding with {@link ServiceBusInboundChannelAdapter} - * via {@link MessageChannel} has name {@value INPUT_CHANNEL} - */ - @ServiceActivator(inputChannel = INPUT_CHANNEL) - public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { - String message = new String(payload); - LOGGER.info("New message received: '{}'", message); - checkpointer.success() - .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) - .doOnError(e -> LOGGER.error("Error found", e)) - .subscribe(); - this.messagingGateway.send(message); - } - - - /** - * Get messages from {@link MessageChannel} with name {@value OUTPUT_CHANNEL_QUEUE2} - * and send messages to queue with name {@value FORWARD_QUEUE_NAME}. - * - * @param serviceBusTemplate template to send messages - * @return instance of {@link MessageChannel} - */ - @Bean - @ServiceActivator(inputChannel = OUTPUT_CHANNEL_QUEUE2) - public MessageHandler queueMessageForwarder(ServiceBusTemplate serviceBusTemplate) { - serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE); - DefaultMessageHandler handler = new DefaultMessageHandler(FORWARD_QUEUE_NAME, serviceBusTemplate); - handler.setSendCallback(new ListenableFutureCallback() { - @Override - public void onSuccess(Void result) { - LOGGER.info("Message was sent successfully for {}.", FORWARD_QUEUE_NAME); - } - - @Override - public void onFailure(Throwable ex) { - LOGGER.info("There was an error sending the message."); - } - }); - - return handler; - } - - /** - * Message gateway binding with {@link MessageHandler} - * via {@link MessageChannel} has name {@value OUTPUT_CHANNEL_QUEUE2} - */ - @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL_QUEUE2) - public interface QueueForwardGateway { - void send(String text); - } -} diff --git a/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/src/main/java/com/azure/spring/sample/servicebus/QueueSendService.java b/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/src/main/java/com/azure/spring/sample/servicebus/QueueSendService.java deleted file mode 100644 index 6fd86e804..000000000 --- a/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/src/main/java/com/azure/spring/sample/servicebus/QueueSendService.java +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.spring.sample.servicebus; - -import com.azure.spring.integration.core.handler.DefaultMessageHandler; -import com.azure.spring.cloud.service.servicebus.properties.ServiceBusEntityType; -import com.azure.spring.messaging.servicebus.core.ServiceBusTemplate; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.context.annotation.Bean; -import org.springframework.integration.annotation.MessagingGateway; -import org.springframework.integration.annotation.ServiceActivator; -import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.MessageHandler; -import org.springframework.stereotype.Service; -import org.springframework.util.concurrent.ListenableFutureCallback; - - -@Service -public class QueueSendService { - - private static final Logger LOGGER = LoggerFactory.getLogger(QueueSendService.class); - private static final String OUTPUT_CHANNEL_QUEUE1 = "queue1.output"; - private static final String QUEUE_NAME = "queue1"; - - @Bean - @ServiceActivator(inputChannel = OUTPUT_CHANNEL_QUEUE1) - public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) { - serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE); - DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate); - handler.setSendCallback(new ListenableFutureCallback() { - @Override - public void onSuccess(Void result) { - LOGGER.info("Message was sent successfully for {}.", QUEUE_NAME); - } - - @Override - public void onFailure(Throwable ex) { - LOGGER.info("There was an error sending the message."); - } - }); - - return handler; - } - - /** - * Message gateway binding with {@link MessageHandler} - * via {@link MessageChannel} has name {@value OUTPUT_CHANNEL_QUEUE1} - */ - @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL_QUEUE1) - public interface QueueSendGateway { - void send(String text); - } -} diff --git a/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/src/main/java/com/azure/spring/sample/servicebus/ServiceBusIntegrationApplication.java b/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/src/main/java/com/azure/spring/sample/servicebus/ServiceBusIntegrationApplication.java index df3ef9f9f..2f5ff4601 100644 --- a/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/src/main/java/com/azure/spring/sample/servicebus/ServiceBusIntegrationApplication.java +++ b/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/src/main/java/com/azure/spring/sample/servicebus/ServiceBusIntegrationApplication.java @@ -3,31 +3,17 @@ package com.azure.spring.sample.servicebus; -import com.azure.spring.sample.servicebus.QueueSendService.QueueSendGateway; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Configuration; import org.springframework.integration.config.EnableIntegration; @SpringBootApplication @EnableIntegration -@EnableConfigurationProperties(CustomizedServiceBusProperties.class) -@Configuration(proxyBeanMethods = false) -public class ServiceBusIntegrationApplication implements CommandLineRunner { +public class ServiceBusIntegrationApplication { - @Autowired - private QueueSendGateway messagingGateway; public static void main(String[] args) { SpringApplication.run(ServiceBusIntegrationApplication.class, args); } - @Override - public void run(String... args) { - this.messagingGateway.send("hello"); - } - } diff --git a/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/src/main/resources/application.yaml b/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/src/main/resources/application.yaml index 970d24781..4dcdb9b98 100644 --- a/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/src/main/resources/application.yaml +++ b/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/src/main/resources/application.yaml @@ -1,15 +1,11 @@ -servicebus.producers[0]: - entity-name: queue1 - entity-type: queue - namespace: ${AZURE_SERVICEBUS_NAMESPACE_01} -servicebus.producers[1]: - entity-name: queue2 - entity-type: queue - namespace: ${AZURE_SERVICEBUS_NAMESPACE_02} -servicebus.processors[0]: - entity-name: queue1 - entity-type: queue - namespace: ${AZURE_SERVICEBUS_NAMESPACE_01} +my.servicebus.namespaces: + - namespace: ${AZURE_SERVICEBUS_NAMESPACE_01} + entity-type: queue + entity-name: ${AZURE_SERVICEBUS_NAMESPACE_01_QUEUE_NAME} + - namespace: ${AZURE_SERVICEBUS_NAMESPACE_02} + entity-type: queue + entity-name: ${AZURE_SERVICEBUS_NAMESPACE_02_QUEUE_NAME} + #reduce some noise logging.level: diff --git a/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/terraform/main.tf b/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/terraform/main.tf index c37485b1b..ec1d9176a 100644 --- a/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/terraform/main.tf +++ b/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/terraform/main.tf @@ -119,3 +119,9 @@ resource "azurerm_role_assignment" "role_servicebus_02_data_sender" { role_definition_name = "Azure Service Bus Data Sender" principal_id = data.azurerm_client_config.client_config.object_id } + +resource "azurerm_role_assignment" "role_servicebus_02_data_receiver" { + scope = azurerm_servicebus_queue.application_queue_02.id + role_definition_name = "Azure Service Bus Data Receiver" + principal_id = data.azurerm_client_config.client_config.object_id +} diff --git a/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/terraform/outputs.tf b/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/terraform/outputs.tf index f56a56b86..e3fedb897 100644 --- a/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/terraform/outputs.tf +++ b/servicebus/spring-cloud-azure-starter-integration-servicebus/multiple-namespaces/terraform/outputs.tf @@ -8,6 +8,16 @@ output "AZURE_SERVICEBUS_NAMESPACE_02" { description = "The name of servicebus_02 namespace." } +output "AZURE_SERVICEBUS_NAMESPACE_01_QUEUE_NAME" { + value = azurerm_servicebus_queue.application_queue_01.name + description = "The name the queue in servicebus_01 namespace." +} + +output "AZURE_SERVICEBUS_NAMESPACE_02_QUEUE_NAME" { + value = azurerm_servicebus_queue.application_queue_02.name + description = "The name the queue in servicebus_02 namespace." +} + output "RESOURCE_GROUP_NAME" { value = azurerm_resource_group.main.name description = "The resource group name."