From 752d54bd13ac6fa185adc529069d8171094a1636 Mon Sep 17 00:00:00 2001 From: Xiaolu Dai Date: Thu, 11 Nov 2021 12:40:13 +0800 Subject: [PATCH] change the way to supply extended properties for binding, no need for user to specify event hub name or consumer group --- .../EventHubConsumerProperties.java | 7 +-- .../EventHubProducerProperties.java | 8 +--- .../EventHubMessageChannelBinder.java | 47 ++++++++++--------- 3 files changed, 28 insertions(+), 34 deletions(-) diff --git a/sdk/spring/spring-cloud-azure-stream-binder-eventhubs-core/src/main/java/com/azure/spring/cloud/stream/binder/eventhubs/properties/EventHubConsumerProperties.java b/sdk/spring/spring-cloud-azure-stream-binder-eventhubs-core/src/main/java/com/azure/spring/cloud/stream/binder/eventhubs/properties/EventHubConsumerProperties.java index 308eae78692d1..2949682081bd0 100644 --- a/sdk/spring/spring-cloud-azure-stream-binder-eventhubs-core/src/main/java/com/azure/spring/cloud/stream/binder/eventhubs/properties/EventHubConsumerProperties.java +++ b/sdk/spring/spring-cloud-azure-stream-binder-eventhubs-core/src/main/java/com/azure/spring/cloud/stream/binder/eventhubs/properties/EventHubConsumerProperties.java @@ -9,7 +9,7 @@ /** * */ -public class EventHubConsumerProperties { +public class EventHubConsumerProperties extends ProcessorProperties { // /** // * Whether the consumer receives messages from the beginning or end of event hub. // * If {@link StartPosition#EARLIEST}, from beginning. If {@link StartPosition#LATEST}, from end. @@ -18,7 +18,6 @@ public class EventHubConsumerProperties { // */ // private EventProcessingProperties.StartPosition startPosition = EventProcessingProperties.StartPosition.LATEST; private final CheckpointConfig checkpoint = new CheckpointConfig(); - private final ProcessorProperties processor = new ProcessorProperties(); // public EventProcessingProperties.StartPosition getStartPosition() { // return startPosition; @@ -32,8 +31,4 @@ public CheckpointConfig getCheckpoint() { return checkpoint; } - public ProcessorProperties getProcessor() { - return processor; - } - } diff --git a/sdk/spring/spring-cloud-azure-stream-binder-eventhubs-core/src/main/java/com/azure/spring/cloud/stream/binder/eventhubs/properties/EventHubProducerProperties.java b/sdk/spring/spring-cloud-azure-stream-binder-eventhubs-core/src/main/java/com/azure/spring/cloud/stream/binder/eventhubs/properties/EventHubProducerProperties.java index b13900f883018..b7b34622ec5c7 100644 --- a/sdk/spring/spring-cloud-azure-stream-binder-eventhubs-core/src/main/java/com/azure/spring/cloud/stream/binder/eventhubs/properties/EventHubProducerProperties.java +++ b/sdk/spring/spring-cloud-azure-stream-binder-eventhubs-core/src/main/java/com/azure/spring/cloud/stream/binder/eventhubs/properties/EventHubProducerProperties.java @@ -8,7 +8,7 @@ /** * @author Warren Zhu */ -public class EventHubProducerProperties { +public class EventHubProducerProperties extends ProducerProperties { /** * Whether the producer should act in a synchronous manner with respect to sending messages into destination. * If true, the producer will wait for a response from Event Hub after a send operation before sending next message. @@ -26,9 +26,6 @@ public class EventHubProducerProperties { */ private long sendTimeout = 10000; - private final ProducerProperties producer = new ProducerProperties(); - - public boolean isSync() { return sync; } @@ -45,7 +42,4 @@ public void setSendTimeout(long sendTimeout) { this.sendTimeout = sendTimeout; } - public ProducerProperties getProducer() { - return producer; - } } diff --git a/sdk/spring/spring-cloud-azure-stream-binder-eventhubs/src/main/java/com/azure/spring/cloud/stream/binder/eventhubs/EventHubMessageChannelBinder.java b/sdk/spring/spring-cloud-azure-stream-binder-eventhubs/src/main/java/com/azure/spring/cloud/stream/binder/eventhubs/EventHubMessageChannelBinder.java index 8d46f849a1dcc..7d79043adcc4e 100644 --- a/sdk/spring/spring-cloud-azure-stream-binder-eventhubs/src/main/java/com/azure/spring/cloud/stream/binder/eventhubs/EventHubMessageChannelBinder.java +++ b/sdk/spring/spring-cloud-azure-stream-binder-eventhubs/src/main/java/com/azure/spring/cloud/stream/binder/eventhubs/EventHubMessageChannelBinder.java @@ -4,7 +4,6 @@ package com.azure.spring.cloud.stream.binder.eventhubs; import com.azure.messaging.eventhubs.CheckpointStore; -import com.azure.spring.cloud.stream.binder.eventhubs.properties.EventHubBindingProperties; import com.azure.spring.cloud.stream.binder.eventhubs.properties.EventHubConsumerProperties; import com.azure.spring.cloud.stream.binder.eventhubs.properties.EventHubExtendedBindingProperties; import com.azure.spring.cloud.stream.binder.eventhubs.properties.EventHubProducerProperties; @@ -19,6 +18,8 @@ import com.azure.spring.integration.eventhubs.inbound.EventHubInboundChannelAdapter; import com.azure.spring.integration.handler.DefaultMessageHandler; import com.azure.spring.messaging.PropertiesSupplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder; import org.springframework.cloud.stream.binder.BinderHeaders; import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider; @@ -53,6 +54,7 @@ public class EventHubMessageChannelBinder extends implements ExtendedPropertiesBinder { + private static final Logger LOGGER = LoggerFactory.getLogger(EventHubMessageChannelBinder.class); private static final ExpressionParser EXPRESSION_PARSER = new SpelExpressionParser(); private NamespaceProperties namespaceProperties; @@ -60,6 +62,10 @@ public class EventHubMessageChannelBinder extends private CheckpointStore checkpointStore; private EventHubProcessorContainer processorContainer; private EventHubExtendedBindingProperties bindingProperties = new EventHubExtendedBindingProperties(); + private final Map> + extendedProducerPropertiesMap = new ConcurrentHashMap<>(); + private final Map, ExtendedConsumerProperties> + extendedConsumerPropertiesMap = new ConcurrentHashMap<>(); private final Map eventHubsInUse = new ConcurrentHashMap<>(); @@ -72,6 +78,7 @@ protected MessageHandler createProducerMessageHandler( ProducerDestination destination, ExtendedProducerProperties producerProperties, MessageChannel errorChannel) { + extendedProducerPropertiesMap.put(destination.getName(), producerProperties); Assert.notNull(getEventHubTemplate(), "eventHubsTemplate can't be null when create a producer"); eventHubsInUse.put(destination.getName(), new EventHubInformation(null)); @@ -95,6 +102,7 @@ protected MessageHandler createProducerMessageHandler( @Override protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties properties) { + extendedConsumerPropertiesMap.put(Tuples.of(destination.getName(), group), properties); Assert.notNull(getProcessorContainer(), "eventProcessorsContainer can't be null when create a consumer"); eventHubsInUse.put(destination.getName(), new EventHubInformation(group)); @@ -158,33 +166,30 @@ public String getConsumerGroup() { private PropertiesSupplier getProducerPropertiesSupplier() { return key -> { - Map bindings = bindingProperties.getBindings(); - for (Map.Entry entry : bindings.entrySet()) { - ProducerProperties properties = bindings.get(entry.getKey()).getProducer().getProducer(); - if (properties.getEventHubName() == null) { - continue; - } - if (key.equalsIgnoreCase(properties.getEventHubName())) { - return properties; - } + if (this.extendedProducerPropertiesMap.containsKey(key)) { + EventHubProducerProperties producerProperties = this.extendedProducerPropertiesMap.get(key) + .getExtension(); + producerProperties.setEventHubName(key); + return producerProperties; + } else { + LOGGER.debug("Can't find extended properties for {}", key); + return null; } - return null; }; } private PropertiesSupplier, ProcessorProperties> getProcessorPropertiesSupplier() { return key -> { - Map bindings = bindingProperties.getBindings(); - for (Map.Entry entry : bindings.entrySet()) { - ProcessorProperties properties = bindings.get(entry.getKey()).getConsumer().getProcessor(); - if (properties.getEventHubName() == null || properties.getConsumerGroup() == null) { - continue; - } - if (key.equals(Tuples.of(properties.getEventHubName(), properties.getConsumerGroup()))) { - return properties; - } + if (this.extendedConsumerPropertiesMap.containsKey(key)) { + EventHubConsumerProperties consumerProperties = this.extendedConsumerPropertiesMap.get(key) + .getExtension(); + consumerProperties.setEventHubName(key.getT1()); + consumerProperties.setConsumerGroup(key.getT2()); + return consumerProperties; + } else { + LOGGER.debug("Can't find extended properties for destination {}, group {}", key.getT1(), key.getT2()); + return null; } - return null; }; }