Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sb integration bug #25501

Merged
merged 10 commits into from
Nov 19, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class AzureServiceBusMessagingAutoConfiguration {

@Bean
@ConditionalOnMissingBean
public NamespaceProperties serviceBusNamespaceTopicProperties(AzureServiceBusProperties properties) {
public NamespaceProperties serviceBusNamespaceProperties(AzureServiceBusProperties properties) {
NamespaceProperties namespaceProperties = new NamespaceProperties();
BeanUtils.copyProperties(properties, namespaceProperties);
copyAzureCommonProperties(properties, namespaceProperties);
Expand Down Expand Up @@ -83,7 +83,7 @@ public static class ServiceBusTemplateConfiguration {

@Bean
@ConditionalOnMissingBean
public ServiceBusProducerFactory defaultServiceBusNamespaceQueueProducerFactory(
public ServiceBusProducerFactory defaultServiceBusNamespaceProducerFactory(
NamespaceProperties properties,
ObjectProvider<PropertiesSupplier<String, ProducerProperties>> suppliers) {
return new DefaultServiceBusNamespaceProducerFactory(properties, suppliers.getIfAvailable());
Expand All @@ -98,8 +98,8 @@ public ServiceBusMessageConverter messageConverter() {
@Bean
@ConditionalOnMissingBean
@ConditionalOnBean(ServiceBusProducerFactory.class)
public ServiceBusTemplate queueOperation(ServiceBusProducerFactory senderClientfactory,
ServiceBusMessageConverter messageConverter) {
public ServiceBusTemplate serviceBusTemplate(ServiceBusProducerFactory senderClientfactory,
ServiceBusMessageConverter messageConverter) {
ServiceBusTemplate serviceBusTemplate = new ServiceBusTemplate(senderClientfactory);
serviceBusTemplate.setMessageConverter(messageConverter);
return serviceBusTemplate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ void noQueueNameOrTopicNameProvidedShouldNotConfigure() {
}

@Test
void entityTypeProvidedShouldNotConfigure() {
void entityNameProvidedShouldConfigure() {
ServiceBusClientBuilder serviceBusClientBuilder = new ServiceBusClientBuilder();
serviceBusClientBuilder.connectionString(String.format(CONNECTION_STRING, "test-namespace"));

Expand All @@ -42,7 +42,7 @@ void entityTypeProvidedShouldNotConfigure() {
}

@Test
void entityNameProvidedShouldNotConfigure() {
void entityTypeProvidedShouldConfigure() {
ServiceBusClientBuilder serviceBusClientBuilder = new ServiceBusClientBuilder();
serviceBusClientBuilder.connectionString(String.format(CONNECTION_STRING, "test-namespace"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public ServiceBusTemplate getServiceBusTemplate() {
DefaultServiceBusNamespaceProducerFactory factory = new DefaultServiceBusNamespaceProducerFactory(
this.namespaceProperties, getProducerPropertiesSupplier());

factory.addListener((name) -> {
factory.addListener((name, client) -> {
DefaultInstrumentation instrumentation = new DefaultInstrumentation(name, PRODUCER);
instrumentation.markUp();
instrumentationManager.addHealthInstrumentation(instrumentation.getId(), instrumentation);
Expand All @@ -243,7 +243,7 @@ private ServiceBusProcessorContainer getProcessorContainer() {
DefaultServiceBusNamespaceProcessorFactory factory = new DefaultServiceBusNamespaceProcessorFactory(
this.namespaceProperties, getProcessorPropertiesSupplier());

factory.addListener((name, subscription) -> {
factory.addListener((name, subscription, client) -> {
String instrumentationName = name + "/" + subscription == null ? "" : subscription;
Instrumentation instrumentation = new ServiceBusProcessorInstrumentation(instrumentationName, CONSUMER, Duration.ofMinutes(2));
instrumentation.markUp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void setInstrumentationId(String instrumentationId) {
}
private class IntegrationRecordMessageProcessingListener implements RecordMessageProcessingListener {

private ServiceBusMessageConverter messageConverter;
private ServiceBusMessageConverter messageConverter = new ServiceBusMessageConverter();
private Class<?> payloadType = byte[].class;
private InstrumentationManager instrumentationManager;
private String instrumentationId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,33 +75,34 @@ public EventProcessorClient createProcessor(@NonNull String eventHub, @NonNull S

@Override
public void destroy() {
this.processorClientMap.values().forEach(EventProcessorClient::stop);
this.processorClientMap.forEach((t, client) -> {
listeners.forEach(l -> l.processorRemoved(t.getT1(), t.getT2(), client));
client.stop();
});
this.processorClientMap.clear();
this.listeners.clear();
}

private EventProcessorClient doCreateProcessor(@NonNull String eventHub, @NonNull String consumerGroup,
@NonNull EventProcessingListener listener,
@Nullable ProcessorProperties properties) {
Tuple2<String, String> key = Tuples.of(eventHub, consumerGroup);
if (this.processorClientMap.containsKey(key)) {
return this.processorClientMap.get(key);
}
return processorClientMap.computeIfAbsent(key, k -> {

ProcessorProperties processorProperties = propertiesMerger.mergeParent(properties, this.namespaceProperties);
processorProperties.setEventHubName(eventHub);
processorProperties.setConsumerGroup(consumerGroup);
ProcessorProperties processorProperties = propertiesMerger.mergeParent(properties, this.namespaceProperties);
processorProperties.setEventHubName(k.getT1());
processorProperties.setConsumerGroup(k.getT2());

EventProcessorClientBuilderFactory factory =
new EventProcessorClientBuilderFactory(processorProperties, this.checkpointStore, listener);
factory.setSpringIdentifier(AzureSpringIdentifier.AZURE_SPRING_INTEGRATION_EVENT_HUBS);
EventProcessorClient client = factory.build().buildEventProcessorClient();
LOGGER.info("EventProcessor created for event hub '{}' with consumer group '{}'", eventHub, consumerGroup);
EventProcessorClientBuilderFactory factory =
new EventProcessorClientBuilderFactory(processorProperties, this.checkpointStore, listener);
factory.setSpringIdentifier(AzureSpringIdentifier.AZURE_SPRING_INTEGRATION_EVENT_HUBS);
EventProcessorClient client = factory.build().buildEventProcessorClient();
LOGGER.info("EventProcessor created for event hub '{}' with consumer group '{}'", eventHub, consumerGroup);

this.listeners.forEach(l -> l.processorAdded(eventHub, consumerGroup));
this.listeners.forEach(l -> l.processorAdded(k.getT1(), k.getT2(), client));

this.processorClientMap.put(key, client);

return client;
return client;
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ default boolean removeListener(Listener listener) {
*/
interface Listener {

default void processorAdded(String eventHub, String consumerGroup) {
default void processorAdded(String eventHub, String consumerGroup, EventProcessorClient client) {

}

default void processorRemoved(String eventHub, String consumerGroup) {
default void processorRemoved(String eventHub, String consumerGroup, EventProcessorClient client) {
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,16 @@ public EventHubProducerAsyncClient createProducer(String eventHub) {
}

private EventHubProducerAsyncClient doCreateProducer(String eventHub, @Nullable ProducerProperties properties) {
if (this.clients.containsKey(eventHub)) {
return this.clients.get(eventHub);
}
return clients.computeIfAbsent(eventHub, entityName -> {
ProducerProperties producerProperties = parentMerger.mergeParent(properties, this.namespaceProperties);
producerProperties.setEventHubName(eventHub);
saragluna marked this conversation as resolved.
Show resolved Hide resolved
EventHubClientBuilderFactory factory = new EventHubClientBuilderFactory(producerProperties);
factory.setSpringIdentifier(AzureSpringIdentifier.AZURE_SPRING_INTEGRATION_EVENT_HUBS);
EventHubProducerAsyncClient producerClient = factory.build().buildAsyncProducerClient();
this.listeners.forEach(l -> l.producerAdded(entityName, producerClient));

ProducerProperties producerProperties = parentMerger.mergeParent(properties, this.namespaceProperties);
producerProperties.setEventHubName(eventHub);
EventHubClientBuilderFactory factory = new EventHubClientBuilderFactory(producerProperties);
factory.setSpringIdentifier(AzureSpringIdentifier.AZURE_SPRING_INTEGRATION_EVENT_HUBS);
EventHubProducerAsyncClient producerClient = factory.build().buildAsyncProducerClient();

this.listeners.forEach(l -> l.producerAdded(eventHub));

this.clients.put(eventHub, producerClient);
return producerClient;
return producerClient;
});
}

@Override
Expand All @@ -75,7 +71,11 @@ public boolean removeListener(Listener listener) {

@Override
public void destroy() {
this.clients.values().forEach(EventHubProducerAsyncClient::close);
this.clients.forEach((name, client) -> {
this.listeners.forEach(l -> l.producerRemoved(name, client));
client.close();
});
this.clients.clear();
this.listeners.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ default boolean removeListener(Listener listener) {
*/
interface Listener {

default void producerAdded(String name) {
default void producerAdded(String name, EventHubProducerAsyncClient client) {

}

default void producerRemoved(String name) {
default void producerRemoved(String name, EventHubProducerAsyncClient client) {
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ void setUp() {
processorAddedTimes = 0;
this.processorFactory.addListener(new EventHubsProcessorFactory.Listener() {
@Override
public void processorAdded(String eventHub, String consumerGroup) {
public void processorAdded(String eventHub, String consumerGroup, EventProcessorClient client) {
processorAddedTimes++;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ void setUp() {
producerAddedTimes = 0;
this.producerFactory.addListener(new EventHubsProducerFactory.Listener() {
@Override
public void producerAdded(String name) {
public void producerAdded(String name, EventHubProducerAsyncClient client) {
producerAddedTimes++;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public boolean unsubscribe(String queue) {
public ServiceBusProcessorClient subscribe(String topic, String subscription, MessageProcessingListener listener) {
ServiceBusProcessorClient processor = this.processorFactory.createProcessor(topic, subscription, listener);
processor.start();
this.listeners.forEach(l -> l.processorAdded(topic, subscription));
this.listeners.forEach(l -> l.processorAdded(topic, subscription, processor));
this.clients.add(processor);
return processor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ public DefaultServiceBusNamespaceProcessorFactory(NamespaceProperties namespaceP
this.propertiesSupplier = supplier == null ? key -> null : supplier;
}

private <K, V> void close(Map<Tuple2<String, String>, V> map, Consumer<V> close) {
private void close(Map<Tuple2<String, String>, ServiceBusProcessorClient> map, Consumer<ServiceBusProcessorClient> close) {
map.forEach((t, p) -> {
try {
listeners.forEach(l -> l.processorRemoved(t.getT1(), t.getT2()));
listeners.forEach(l -> l.processorRemoved(t.getT1(), t.getT2(), p));
close.accept(p);
} catch (Exception ex) {
LOGGER.warn("Failed to clean service bus queue client factory", ex);
Expand Down Expand Up @@ -92,12 +92,12 @@ private ServiceBusProcessorClient doCreateProcessor(String name, String subscrip

return processorMap.computeIfAbsent(key, k -> {
ProcessorProperties processorProperties = propertiesMerger.mergeParent(properties, this.namespaceProperties);
processorProperties.setEntityName(name);
if (INVALID_SUBSCRIPTION.equals(subscription)) {
processorProperties.setEntityName(k.getT1());
if (INVALID_SUBSCRIPTION.equals(k.getT2())) {
processorProperties.setEntityType(ServiceBusEntityType.QUEUE);
} else {
processorProperties.setEntityType(ServiceBusEntityType.TOPIC);
processorProperties.setSubscriptionName(subscription);
processorProperties.setSubscriptionName(k.getT2());
}

ServiceBusProcessorClient client;
Expand All @@ -114,8 +114,8 @@ private ServiceBusProcessorClient doCreateProcessor(String name, String subscrip
client = factory.build().buildProcessorClient();
}

this.listeners.forEach(l -> l.processorAdded(name, INVALID_SUBSCRIPTION.equals(subscription) ? null
: subscription));
this.listeners.forEach(l -> l.processorAdded(k.getT1(), INVALID_SUBSCRIPTION.equals(k.getT2()) ? null
: k.getT2(), client));
return client;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ default boolean removeListener(Listener listener) {
@FunctionalInterface
interface Listener {

void processorAdded(String name, String subscription);
void processorAdded(String name, String subscription, ServiceBusProcessorClient client);

default void processorRemoved(String name, String subscription) {
default void processorRemoved(String name, String subscription, ServiceBusProcessorClient client) {
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public DefaultServiceBusNamespaceProducerFactory(NamespaceProperties namespacePr
public DefaultServiceBusNamespaceProducerFactory(NamespaceProperties namespaceProperties,
PropertiesSupplier<String, ProducerProperties> supplier) {
this.namespaceProperties = namespaceProperties;
this.propertiesSupplier = supplier;
this.propertiesSupplier = supplier == null ? key -> null : supplier;
}

public ServiceBusSenderAsyncClient createProducer(String name) {
Expand All @@ -58,7 +58,7 @@ public boolean removeListener(Listener listener) {
@Override
public void destroy() {
clients.forEach((name, producer) -> {
listeners.forEach(l -> l.producerRemoved(name));
listeners.forEach(l -> l.producerRemoved(name, producer));
producer.close();
});
this.clients.clear();
Expand All @@ -75,7 +75,7 @@ private ServiceBusSenderAsyncClient doCreateProducer(String name, @Nullable Prod
factory.setSpringIdentifier(AzureSpringIdentifier.AZURE_SPRING_INTEGRATION_SERVICE_BUS);
ServiceBusSenderAsyncClient producerClient = factory.build().buildAsyncClient();

this.listeners.forEach(l -> l.producerAdded(entityName));
this.listeners.forEach(l -> l.producerAdded(entityName, producerClient));
return producerClient;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ default boolean removeListener(Listener listener) {
@FunctionalInterface
interface Listener {

void producerAdded(String name);
void producerAdded(String name, ServiceBusSenderAsyncClient client);

default void producerRemoved(String name) {
default void producerRemoved(String name, ServiceBusSenderAsyncClient client) {
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ void setUp() {
this.processorFactory = new DefaultServiceBusNamespaceProcessorFactory(namespaceProperties);
queueProcessorAddedTimes = 0;
topicProcessorAddedTimes = 0;
this.processorFactory.addListener((name, subscription) -> {
this.processorFactory.addListener((name, subscription, client) -> {
if (subscription == null) {
queueProcessorAddedTimes++;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ void setUp() {
namespaceProperties.setEntityType(ServiceBusEntityType.QUEUE);
this.producerFactory = new DefaultServiceBusNamespaceProducerFactory(namespaceProperties);
producerAddedTimes = 0;
this.producerFactory.addListener((name) -> producerAddedTimes++);
this.producerFactory.addListener((name, client) -> producerAddedTimes++);
}

@Test
Expand Down