Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor multi-namespace sample of spring integration service bus #233

Merged
merged 6 commits into from
Mar 23, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public String send(@RequestParam("message") String message) {
*/
@Bean
@ServiceActivator(inputChannel = OUTPUT_CHANNEL)
public MessageHandler messageSender(EventHubsTemplate queueOperation) {
DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, queueOperation);
public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate);
handler.setSendCallback(new ListenableFutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,52 +3,21 @@

package com.azure.spring.sample.servicebus;

import com.azure.spring.messaging.ConsumerIdentifier;
import com.azure.spring.messaging.PropertiesSupplier;
import com.azure.spring.messaging.servicebus.core.properties.CommonProperties;
import com.azure.spring.messaging.servicebus.core.properties.ProcessorProperties;
import com.azure.spring.messaging.servicebus.core.properties.ProducerProperties;
import com.azure.spring.messaging.servicebus.core.properties.NamespaceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.NestedConfigurationProperty;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Property class for Service Bus multiple namespaces sample.
*/
@ConfigurationProperties("servicebus")
@ConfigurationProperties("my.servicebus")
public class CustomizedServiceBusProperties {

@NestedConfigurationProperty
private final List<ProducerProperties> producers = new ArrayList<>();
private final List<NamespaceProperties> namespaces = new ArrayList<>();

@NestedConfigurationProperty
private final List<ProcessorProperties> processors = new ArrayList<>();

public PropertiesSupplier<String, ProducerProperties> producerPropertiesSupplier() {
Map<String, ProducerProperties> map = producers
.stream()
.collect(Collectors.toMap(CommonProperties::getEntityName, p -> p));
return map::get;
}

public PropertiesSupplier<ConsumerIdentifier, ProcessorProperties> processorPropertiesSupplier() {
Map<ConsumerIdentifier, ProcessorProperties> map = processors
.stream()
.collect(Collectors.toMap(
p -> p.getSubscriptionName() == null ? new ConsumerIdentifier(p.getEntityName()) : new ConsumerIdentifier(p.getEntityName(), p.getSubscriptionName()), p -> p));
return map::get;
}

public List<ProducerProperties> getProducers() {
return producers;
}

public List<ProcessorProperties> getProcessors() {
return processors;
public List<NamespaceProperties> getNamespaces() {
return namespaces;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package com.azure.spring.sample.servicebus;

import com.azure.spring.integration.core.handler.DefaultMessageHandler;
import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import com.azure.spring.messaging.servicebus.core.ServiceBusTemplate;
import com.azure.spring.messaging.servicebus.core.listener.ServiceBusMessageListenerContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.messaging.MessageHandler;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;

@Configuration
public class IntegrationFlowConfiguration {

@Value("${my.servicebus.namespaces[0].entity-name:}")
private String firstQueueName;

@Value("${my.servicebus.namespaces[1].entity-name:}")
private String secondQueueName;

private static final Logger LOGGER = LoggerFactory.getLogger(IntegrationFlowConfiguration.class);


private final ServiceBusTemplate firstServiceBusTemplate;
private final ServiceBusTemplate secondServiceBusTemplate;
private final ServiceBusMessageListenerContainer firstMessageListenerContainer;
private final ServiceBusMessageListenerContainer secondMessageListenerContainer;

public IntegrationFlowConfiguration(@Qualifier("firstServiceBusTemplate") ServiceBusTemplate firstServiceBusTemplate,
@Qualifier("secondServiceBusTemplate") ServiceBusTemplate secondServiceBusTemplate,
@Qualifier("firstMessageListenerContainer") ServiceBusMessageListenerContainer firstMessageListenerContainer,
@Qualifier("secondMessageListenerContainer") ServiceBusMessageListenerContainer secondMessageListenerContainer) {
this.firstServiceBusTemplate = firstServiceBusTemplate;
this.secondServiceBusTemplate = secondServiceBusTemplate;
this.firstMessageListenerContainer = firstMessageListenerContainer;
this.secondMessageListenerContainer = secondMessageListenerContainer;
}


@Bean
public MessageHandler firstMessageHandler() {
DefaultMessageHandler handler = new DefaultMessageHandler(firstQueueName, firstServiceBusTemplate);
return handler;
}

@Bean
public ServiceBusInboundChannelAdapter firstServiceBusInboundChannelAdapter() {
ServiceBusInboundChannelAdapter channelAdapter = new ServiceBusInboundChannelAdapter(firstMessageListenerContainer);
channelAdapter.setPayloadType(String.class);
return channelAdapter;
}

@Bean
public MessageHandler secondMessageHandler() {
DefaultMessageHandler handler = new DefaultMessageHandler(secondQueueName, secondServiceBusTemplate);
return handler;
}

@Bean
public ServiceBusInboundChannelAdapter secondServiceBusInboundChannelAdapter() {
ServiceBusInboundChannelAdapter channelAdapter = new ServiceBusInboundChannelAdapter(secondMessageListenerContainer);
channelAdapter.setPayloadType(String.class);
return channelAdapter;
}

@Bean
public AtomicInteger integerSource() {
return new AtomicInteger();
}

@Bean
public IntegrationFlow sendFlow() {
return IntegrationFlows.fromSupplier(integerSource()::getAndIncrement,
c -> c.poller(Pollers.fixedRate(Duration.ofSeconds(10))))
.<Integer, Boolean>route(p -> p % 2 == 0, m
-> m.subFlowMapping(true, f -> f.handle(firstMessageHandler()))
.subFlowMapping(false, f -> f.handle(secondMessageHandler())))
.get();
}

@Bean
public IntegrationFlow transformFlow() {
return IntegrationFlows.from(firstServiceBusInboundChannelAdapter())
.transform(m -> {
LOGGER.info("Receive messages from the first queue: {}", m);
return "transformed from queue1 " + m;
})
.handle(secondMessageHandler())
.get();
}

@Bean
public IntegrationFlow secondListenerFlow() {
return IntegrationFlows.from(secondServiceBusInboundChannelAdapter())
.handle(m -> LOGGER.info("Receive messages from the second queue: {}", m.getPayload()))
.get();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.azure.spring.sample.servicebus;

import com.azure.spring.messaging.servicebus.core.DefaultServiceBusNamespaceProcessorFactory;
import com.azure.spring.messaging.servicebus.core.DefaultServiceBusNamespaceProducerFactory;
import com.azure.spring.messaging.servicebus.core.ServiceBusProcessorFactory;
import com.azure.spring.messaging.servicebus.core.ServiceBusProducerFactory;
import com.azure.spring.messaging.servicebus.core.ServiceBusTemplate;
import com.azure.spring.messaging.servicebus.core.listener.ServiceBusMessageListenerContainer;
import com.azure.spring.messaging.servicebus.core.properties.NamespaceProperties;
import com.azure.spring.messaging.servicebus.core.properties.ServiceBusContainerProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableConfigurationProperties(CustomizedServiceBusProperties.class)
public class MultipleAzureServiceBusNamespacesConfiguration {

private final NamespaceProperties firstNamespaceProperties;
private final NamespaceProperties secondNamespaceProperties;

public MultipleAzureServiceBusNamespacesConfiguration(CustomizedServiceBusProperties properties) {
this.firstNamespaceProperties = properties.getNamespaces().get(0);
this.secondNamespaceProperties = properties.getNamespaces().get(1);
}

@Bean
public ServiceBusTemplate firstServiceBusTemplate() {
ServiceBusProducerFactory producerFactory = new DefaultServiceBusNamespaceProducerFactory(firstNamespaceProperties);
return new ServiceBusTemplate(producerFactory);
}

@Bean
public ServiceBusMessageListenerContainer firstMessageListenerContainer() {
ServiceBusProcessorFactory processorFactory = new DefaultServiceBusNamespaceProcessorFactory(firstNamespaceProperties);
ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
containerProperties.setEntityName(firstNamespaceProperties.getEntityName());
containerProperties.setPrefetchCount(10);
return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
}

@Bean
public ServiceBusTemplate secondServiceBusTemplate() {
ServiceBusProducerFactory producerFactory = new DefaultServiceBusNamespaceProducerFactory(secondNamespaceProperties);
return new ServiceBusTemplate(producerFactory);
}

@Bean
public ServiceBusMessageListenerContainer secondMessageListenerContainer() {
ServiceBusProcessorFactory processorFactory = new DefaultServiceBusNamespaceProcessorFactory(secondNamespaceProperties);
ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
containerProperties.setEntityName(secondNamespaceProperties.getEntityName());
containerProperties.setPrefetchCount(10);
return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
}

}

This file was deleted.

Loading