Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix sample #234

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
fix sample
saragluna committed Mar 22, 2022

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit f5b6eadfd42152e79702df494e1d6cfdb954f725
Original file line number Diff line number Diff line change
@@ -3,52 +3,21 @@

package com.azure.spring.sample.servicebus;

import com.azure.spring.messaging.ConsumerIdentifier;
import com.azure.spring.messaging.PropertiesSupplier;
import com.azure.spring.messaging.servicebus.core.properties.CommonProperties;
import com.azure.spring.messaging.servicebus.core.properties.ProcessorProperties;
import com.azure.spring.messaging.servicebus.core.properties.ProducerProperties;
import com.azure.spring.messaging.servicebus.core.properties.NamespaceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.NestedConfigurationProperty;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Property class for Service Bus multiple namespaces sample.
*/
@ConfigurationProperties("servicebus")
@ConfigurationProperties("my.servicebus")
public class CustomizedServiceBusProperties {

@NestedConfigurationProperty
private final List<ProducerProperties> producers = new ArrayList<>();
private final List<NamespaceProperties> namespaces = new ArrayList<>();

@NestedConfigurationProperty
private final List<ProcessorProperties> processors = new ArrayList<>();

public PropertiesSupplier<String, ProducerProperties> producerPropertiesSupplier() {
Map<String, ProducerProperties> map = producers
.stream()
.collect(Collectors.toMap(CommonProperties::getEntityName, p -> p));
return map::get;
}

public PropertiesSupplier<ConsumerIdentifier, ProcessorProperties> processorPropertiesSupplier() {
Map<ConsumerIdentifier, ProcessorProperties> map = processors
.stream()
.collect(Collectors.toMap(
p -> p.getSubscriptionName() == null ? new ConsumerIdentifier(p.getEntityName()) : new ConsumerIdentifier(p.getEntityName(), p.getSubscriptionName()), p -> p));
return map::get;
}

public List<ProducerProperties> getProducers() {
return producers;
}

public List<ProcessorProperties> getProcessors() {
return processors;
public List<NamespaceProperties> getNamespaces() {
return namespaces;
}

}
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@
import org.springframework.messaging.MessageHandler;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;

@Configuration
@@ -54,7 +55,7 @@ public void onSuccess(Void result) {

@Override
public void onFailure(Throwable ex) {
LOGGER.info("There was an error sending the message.");
LOGGER.error("There was an error sending the message.", ex);
}
});

@@ -69,27 +70,35 @@ public AtomicInteger integerSource() {
@Bean
public IntegrationFlow sendFlow() {
return IntegrationFlows.fromSupplier(integerSource()::getAndIncrement,
c -> c.poller(Pollers.fixedRate(100)))
.<Integer, Boolean>route(p -> p % 2 == 0,
m -> m.subFlowMapping(true, f -> f.handle(new DefaultMessageHandler(QUEUE_NAME_1, firstServiceBusTemplate)))
.subFlowMapping(false, f -> f.handle(new DefaultMessageHandler(QUEUE_NAME_2, secondServiceBusTemplate))))
c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(10))))
.<Integer, Boolean>route(p -> p % 2 == 0, m
-> m.subFlowMapping(true, f -> f.handle(new DefaultMessageHandler(QUEUE_NAME_1, firstServiceBusTemplate)))
.subFlowMapping(false, f -> f.handle(new DefaultMessageHandler(QUEUE_NAME_2, secondServiceBusTemplate))))
.get();
}

@Bean
public IntegrationFlow transformFlow(MessageHandler messageHandler) {
return IntegrationFlows.from(new ServiceBusInboundChannelAdapter(firstMessageListenerContainer))
.handle(m -> LOGGER.info("Receive messages from the first queue: {}", m.getPayload()))
.transform(i -> i + "transformed")
.handle(messageHandler)
.get();
public IntegrationFlow transformFlow() {
ServiceBusInboundChannelAdapter messageProducer = new ServiceBusInboundChannelAdapter(firstMessageListenerContainer);
messageProducer.setPayloadType(String.class);

return IntegrationFlows.from(messageProducer)
.transform(m -> {
LOGGER.info("Receive messages from the first queue: {}", m);
return "transformed from queue1 " + m;
})
.handle(messageSender())
.get();
}

@Bean
public IntegrationFlow secondListenerFlow() {
return IntegrationFlows.from(new ServiceBusInboundChannelAdapter(secondMessageListenerContainer))
.handle(m -> LOGGER.info("Receive messages from the second queue: {}", m.getPayload()))
.get();
ServiceBusInboundChannelAdapter messageProducer = new ServiceBusInboundChannelAdapter(secondMessageListenerContainer);
messageProducer.setPayloadType(String.class);

return IntegrationFlows.from(messageProducer)
.handle(m -> LOGGER.info("Receive messages from the second queue: {}", m.getPayload()))
.get();
}

}
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.azure.spring.sample.servicebus;

import com.azure.spring.messaging.ConsumerIdentifier;
import com.azure.spring.messaging.PropertiesSupplier;
import com.azure.spring.messaging.servicebus.core.*;
import com.azure.spring.messaging.servicebus.core.DefaultServiceBusNamespaceProcessorFactory;
import com.azure.spring.messaging.servicebus.core.DefaultServiceBusNamespaceProducerFactory;
import com.azure.spring.messaging.servicebus.core.ServiceBusProcessorFactory;
import com.azure.spring.messaging.servicebus.core.ServiceBusProducerFactory;
import com.azure.spring.messaging.servicebus.core.ServiceBusTemplate;
import com.azure.spring.messaging.servicebus.core.listener.ServiceBusMessageListenerContainer;
import com.azure.spring.messaging.servicebus.core.properties.ProcessorProperties;
import com.azure.spring.messaging.servicebus.core.properties.ProducerProperties;
import com.azure.spring.messaging.servicebus.core.properties.NamespaceProperties;
import com.azure.spring.messaging.servicebus.core.properties.ServiceBusContainerProperties;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -19,47 +19,42 @@
@EnableConfigurationProperties(CustomizedServiceBusProperties.class)
public class MultipleAzureServiceBusNamespacesConfiguration {

private final CustomizedServiceBusProperties properties;
private final NamespaceProperties firstNamespaceProperties;
private final NamespaceProperties secondNamespaceProperties;

public MultipleAzureServiceBusNamespacesConfiguration(CustomizedServiceBusProperties properties) {
this.properties = properties;
this.firstNamespaceProperties = properties.getNamespaces().get(0);
this.secondNamespaceProperties = properties.getNamespaces().get(1);
}

@Bean
public ServiceBusTemplate firstServiceBusTemplate(ObjectProvider<PropertiesSupplier<String, ProducerProperties>> suppliers) {
ServiceBusProducerFactory producerFactory = new DefaultServiceBusNamespaceProducerFactory(null, suppliers.getIfAvailable());
public ServiceBusTemplate firstServiceBusTemplate() {
ServiceBusProducerFactory producerFactory = new DefaultServiceBusNamespaceProducerFactory(firstNamespaceProperties);
return new ServiceBusTemplate(producerFactory);
}

@Bean(name = "firstMessageListenerContainer")
public ServiceBusMessageListenerContainer firstMessageListenerContainer(ObjectProvider<PropertiesSupplier<ConsumerIdentifier, ProcessorProperties>> suppliers) {
ServiceBusProcessorFactory processorFactory = new DefaultServiceBusNamespaceProcessorFactory(null, suppliers.getIfAvailable());
@Bean
public ServiceBusMessageListenerContainer firstMessageListenerContainer() {
ServiceBusProcessorFactory processorFactory = new DefaultServiceBusNamespaceProcessorFactory(firstNamespaceProperties);
ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
containerProperties.setEntityName(QUEUE_NAME_1);
containerProperties.setPrefetchCount(10);
return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
}

@Bean
public ServiceBusTemplate secondServiceBusTemplate(ObjectProvider<PropertiesSupplier<String, ProducerProperties>> suppliers) {
ServiceBusProducerFactory producerFactory = new DefaultServiceBusNamespaceProducerFactory(null, suppliers.getIfAvailable());
public ServiceBusTemplate secondServiceBusTemplate() {
ServiceBusProducerFactory producerFactory = new DefaultServiceBusNamespaceProducerFactory(secondNamespaceProperties);
return new ServiceBusTemplate(producerFactory);
}

@Bean(name = "secondMessageListenerContainer")
public ServiceBusMessageListenerContainer secondMessageListenerContainer(ObjectProvider<PropertiesSupplier<ConsumerIdentifier, ProcessorProperties>> suppliers) {
ServiceBusProcessorFactory processorFactory = new DefaultServiceBusNamespaceProcessorFactory(null, suppliers.getIfAvailable());
@Bean
public ServiceBusMessageListenerContainer secondMessageListenerContainer() {
ServiceBusProcessorFactory processorFactory = new DefaultServiceBusNamespaceProcessorFactory(secondNamespaceProperties);
ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
containerProperties.setEntityName(QUEUE_NAME_2);
containerProperties.setPrefetchCount(10);
return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
}

@Bean
public PropertiesSupplier<String, ProducerProperties> producerPropertiesSupplier() {
return properties.producerPropertiesSupplier();
}

@Bean
public PropertiesSupplier<ConsumerIdentifier, ProcessorProperties> processorPropertiesSupplier() {
return properties.processorPropertiesSupplier();
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
servicebus.producers[0]:
entity-name: queue1
entity-type: queue
namespace: ${AZURE_SERVICEBUS_NAMESPACE_01}
servicebus.producers[1]:
entity-name: queue2
entity-type: queue
namespace: ${AZURE_SERVICEBUS_NAMESPACE_02}
servicebus.processors[0]:
entity-name: queue1
entity-type: queue
namespace: ${AZURE_SERVICEBUS_NAMESPACE_01}
my.servicebus.namespaces:
- namespace: ${AZURE_SERVICEBUS_NAMESPACE_01}
entity-type: queue
- namespace: ${AZURE_SERVICEBUS_NAMESPACE_02}
entity-type: queue

#reduce some noise
logging.level:
Original file line number Diff line number Diff line change
@@ -119,3 +119,9 @@ resource "azurerm_role_assignment" "role_servicebus_02_data_sender" {
role_definition_name = "Azure Service Bus Data Sender"
principal_id = data.azurerm_client_config.client_config.object_id
}

resource "azurerm_role_assignment" "role_servicebus_02_data_receiver" {
scope = azurerm_servicebus_queue.application_queue_02.id
role_definition_name = "Azure Service Bus Data Receiver"
principal_id = data.azurerm_client_config.client_config.object_id
}