properties = new HashMap<>();
properties.put("sync", sync);
properties.put("sendTimeout", sendTimeoutExpression);
properties.put("destination", destination);
-
return properties;
}
@@ -201,7 +222,7 @@ public Expression getSendTimeoutExpression() {
public void setSendTimeoutExpression(Expression sendTimeoutExpression) {
Assert.notNull(sendTimeoutExpression, "'sendTimeoutExpression' must not be null");
this.sendTimeoutExpression = sendTimeoutExpression;
- LOG.info("DefaultMessageHandler syncTimeout becomes: {}", sendTimeoutExpression);
+ LOGGER.info("DefaultMessageHandler syncTimeout becomes: {}", sendTimeoutExpression);
}
protected MessageChannel getSendFailureChannel() {
diff --git a/sdk/spring/azure-spring-integration-core/src/main/java/com/azure/spring/integration/handler/reactor/DefaultMessageHandler.java b/sdk/spring/azure-spring-integration-core/src/main/java/com/azure/spring/integration/handler/reactor/DefaultMessageHandler.java
deleted file mode 100644
index b0edd6304b0af..0000000000000
--- a/sdk/spring/azure-spring-integration-core/src/main/java/com/azure/spring/integration/handler/reactor/DefaultMessageHandler.java
+++ /dev/null
@@ -1,250 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-package com.azure.spring.integration.handler.reactor;
-
-import com.azure.spring.messaging.AzureHeaders;
-import com.azure.spring.messaging.AzureSendFailureException;
-import com.azure.spring.messaging.PartitionSupplier;
-import com.azure.spring.messaging.core.reactor.SendOperation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.expression.EvaluationContext;
-import org.springframework.expression.Expression;
-import org.springframework.expression.common.LiteralExpression;
-import org.springframework.integration.MessageTimeoutException;
-import org.springframework.integration.expression.ExpressionUtils;
-import org.springframework.integration.expression.ValueExpression;
-import org.springframework.integration.handler.AbstractMessageProducingHandler;
-import org.springframework.integration.support.DefaultErrorMessageStrategy;
-import org.springframework.integration.support.ErrorMessageStrategy;
-import org.springframework.lang.NonNull;
-import org.springframework.messaging.Message;
-import org.springframework.messaging.MessageChannel;
-import org.springframework.messaging.MessageDeliveryException;
-import org.springframework.messaging.MessageHeaders;
-import org.springframework.util.Assert;
-import org.springframework.util.StringUtils;
-import org.springframework.util.concurrent.ListenableFutureCallback;
-import reactor.core.publisher.Mono;
-
-import java.time.Duration;
-import java.time.temporal.ChronoUnit;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeoutException;
-
-/**
- * Base class of outbound adapter to publish to azure backed messaging service
- *
- *
- * It delegates real operation to {@link SendOperation} which supports synchronous and asynchronous sending.
- *
- * @author Warren Zhu
- * @author Xiaolu
- */
-public class DefaultMessageHandler extends AbstractMessageProducingHandler {
- private static final Logger LOGGER = LoggerFactory.getLogger(DefaultMessageHandler.class);
- private static final long DEFAULT_SEND_TIMEOUT = 10000;
- private final String destination;
- private final SendOperation sendOperation;
- private boolean sync = false;
- private ListenableFutureCallback sendCallback;
- private EvaluationContext evaluationContext;
- private Expression sendTimeoutExpression = new ValueExpression<>(DEFAULT_SEND_TIMEOUT);
- private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy();
- private Expression partitionKeyExpression;
- private Expression partitionIdExpression;
- private MessageChannel sendFailureChannel;
- private String sendFailureChannelName;
-
- public DefaultMessageHandler(String destination, @NonNull SendOperation sendOperation) {
- Assert.hasText(destination, "destination can't be null or empty");
- this.destination = destination;
- this.sendOperation = sendOperation;
- }
-
- @Override
- protected void onInit() {
- super.onInit();
- this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
- LOGGER.info("Started DefaultMessageHandler with properties: {}", buildPropertiesMap());
- }
-
- @Override
- protected void handleMessageInternal(Message> message) {
- PartitionSupplier partitionSupplier = toPartitionSupplier(message);
- String destination = toDestination(message);
- final Mono mono = this.sendOperation.sendAsync(destination, message, partitionSupplier);
-
- if (this.sync) {
- waitingSendResponse(mono, message);
- } else {
- handleSendResponseAsync(mono, message);
- }
-
- }
-
- private void handleSendResponseAsync(Mono mono, Message> message) {
- mono.doOnError(ex -> {
- if (LOGGER.isWarnEnabled()) {
- LOGGER.warn("{} sent failed in async mode due to {}", message, ex.getMessage());
- }
- if (this.sendCallback != null) {
- this.sendCallback.onFailure(ex);
- }
-
- if (getSendFailureChannel() != null) {
- this.messagingTemplate.send(getSendFailureChannel(), getErrorMessageStrategy()
- .buildErrorMessage(new AzureSendFailureException(message, ex), null));
- }
- }).doOnSuccess(t -> {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} sent successfully in async mode", message);
- }
- if (this.sendCallback != null) {
- this.sendCallback.onSuccess((Void) t);
- }
- }).subscribe();
- }
-
- private void waitingSendResponse(Mono mono, Message> message) {
- Long sendTimeout = this.sendTimeoutExpression.getValue(this.evaluationContext, message, Long.class);
-
- if (sendTimeout == null || sendTimeout < 0) {
- try {
- mono.block();
- } catch (Exception e) {
- throw new MessageDeliveryException(e.getMessage());
- }
- } else {
- try {
- mono.block(Duration.of(sendTimeout, ChronoUnit.MILLIS));
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} sent successfully in sync mode", message);
- }
- } catch (Exception e) {
- if (e.getCause() instanceof TimeoutException) {
- throw new MessageTimeoutException(message, "Timeout waiting for send event hub response");
- }
- throw new MessageDeliveryException(e.getMessage());
- }
-
- }
- }
-
- public void setSync(boolean sync) {
- this.sync = sync;
- LOGGER.info("DefaultMessageHandler sync becomes: {}", sync);
- }
-
- public void setSendTimeout(long sendTimeout) {
- setSendTimeoutExpression(new ValueExpression<>(sendTimeout));
- }
-
- public void setPartitionKey(String partitionKey) {
- setPartitionKeyExpression(new LiteralExpression(partitionKey));
- }
-
- public void setPartitionKeyExpression(Expression partitionKeyExpression) {
- this.partitionKeyExpression = partitionKeyExpression;
- }
-
- public void setPartitionIdExpression(Expression partitionIdExpression) {
- this.partitionIdExpression = partitionIdExpression;
- }
-
- private String toDestination(Message> message) {
- if (message.getHeaders().containsKey(AzureHeaders.NAME)) {
- return message.getHeaders().get(AzureHeaders.NAME, String.class);
- }
-
- return this.destination;
- }
-
- private PartitionSupplier toPartitionSupplier(Message> message) {
- PartitionSupplier partitionSupplier = new PartitionSupplier();
- // Priority setting partitionId
- String partitionId = getHeaderValue(message.getHeaders(), AzureHeaders.PARTITION_ID);
- if (!StringUtils.hasText(partitionId) && this.partitionIdExpression != null) {
- partitionId = this.partitionIdExpression.getValue(this.evaluationContext, message, String.class);
- }
- if (StringUtils.hasText(partitionId)) {
- partitionSupplier.setPartitionId(partitionId);
- } else {
- String partitionKey = getHeaderValue(message.getHeaders(), AzureHeaders.PARTITION_KEY);
- // The default key expression is the hash code of the payload.
- if (!StringUtils.hasText(partitionKey) && this.partitionKeyExpression != null) {
- partitionKey = this.partitionKeyExpression.getValue(this.evaluationContext, message, String.class);
- }
- if (StringUtils.hasText(partitionKey)) {
- partitionSupplier.setPartitionKey(partitionKey);
- }
- }
- return partitionSupplier;
- }
-
- /**
- * Get header value from MessageHeaders
- * @param headers MessageHeaders
- * @param keyName Key name
- * @return String header value
- */
- private String getHeaderValue(MessageHeaders headers, String keyName) {
- return headers.keySet().stream()
- .filter(header -> keyName.equals(header))
- .map(key -> String.valueOf(headers.get(key)))
- .findAny()
- .orElse(null);
- }
-
- private Map buildPropertiesMap() {
- Map properties = new HashMap<>();
- properties.put("sync", sync);
- properties.put("sendTimeout", sendTimeoutExpression);
- properties.put("destination", destination);
- return properties;
- }
-
- public void setSendCallback(ListenableFutureCallback callback) {
- this.sendCallback = callback;
- }
-
- public Expression getSendTimeoutExpression() {
- return sendTimeoutExpression;
- }
-
- public void setSendTimeoutExpression(Expression sendTimeoutExpression) {
- Assert.notNull(sendTimeoutExpression, "'sendTimeoutExpression' must not be null");
- this.sendTimeoutExpression = sendTimeoutExpression;
- LOGGER.info("DefaultMessageHandler syncTimeout becomes: {}", sendTimeoutExpression);
- }
-
- protected MessageChannel getSendFailureChannel() {
- if (this.sendFailureChannel != null) {
- return this.sendFailureChannel;
- } else if (this.sendFailureChannelName != null) {
- this.sendFailureChannel = getChannelResolver().resolveDestination(this.sendFailureChannelName);
- return this.sendFailureChannel;
- }
-
- return null;
- }
-
- public void setSendFailureChannel(MessageChannel sendFailureChannel) {
- this.sendFailureChannel = sendFailureChannel;
- }
-
- public void setSendFailureChannelName(String sendFailureChannelName) {
- this.sendFailureChannelName = sendFailureChannelName;
- }
-
- protected ErrorMessageStrategy getErrorMessageStrategy() {
- return this.errorMessageStrategy;
- }
-
- public void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) {
- Assert.notNull(errorMessageStrategy, "'errorMessageStrategy' must not be null");
- this.errorMessageStrategy = errorMessageStrategy;
- }
-}
diff --git a/sdk/spring/azure-spring-integration-core/src/main/java/com/azure/spring/integration/handler/reactor/package-info.java b/sdk/spring/azure-spring-integration-core/src/main/java/com/azure/spring/integration/handler/reactor/package-info.java
deleted file mode 100644
index 0d184bb2f6422..0000000000000
--- a/sdk/spring/azure-spring-integration-core/src/main/java/com/azure/spring/integration/handler/reactor/package-info.java
+++ /dev/null
@@ -1,7 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-/**
- * Package com.azure.spring.integration.handler.reactor;
- */
-package com.azure.spring.integration.handler.reactor;
diff --git a/sdk/spring/azure-spring-integration-core/src/test/java/com/azure/spring/integration/handler/MessageHandlerTest.java b/sdk/spring/azure-spring-integration-core/src/test/java/com/azure/spring/integration/handler/MessageHandlerTest.java
index 084aad7b065c4..ea17204695e7b 100644
--- a/sdk/spring/azure-spring-integration-core/src/test/java/com/azure/spring/integration/handler/MessageHandlerTest.java
+++ b/sdk/spring/azure-spring-integration-core/src/test/java/com/azure/spring/integration/handler/MessageHandlerTest.java
@@ -6,105 +6,80 @@
import com.azure.spring.messaging.AzureHeaders;
import com.azure.spring.messaging.PartitionSupplier;
import com.azure.spring.messaging.core.SendOperation;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.expression.Expression;
import org.springframework.integration.MessageTimeoutException;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.util.concurrent.ListenableFutureCallback;
+import reactor.core.publisher.Mono;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.spy;
+import static org.junit.jupiter.api.Assertions.assertThrows;
public abstract class MessageHandlerTest {
- @SuppressWarnings("unchecked")
- protected O sendOperation = null;
-
- protected DefaultMessageHandler handler = null;
protected String destination = "dest";
protected String dynamicDestination = "dynamicName";
-
- @SuppressWarnings("unchecked")
- protected CompletableFuture future = new CompletableFuture<>();
+ protected DefaultMessageHandler handler;
+ protected Mono mono = Mono.empty();
+ protected O sendOperation;
private Message> message;
private String payload = "payload";
- public abstract void setUp();
- protected O getSendOperation() {
- return sendOperation;
- }
-
- protected void setSendOperation(O sendOperation) {
- this.sendOperation = sendOperation;
- }
-
- protected DefaultMessageHandler getHandler() {
- return handler;
- }
-
- protected void setHandler(DefaultMessageHandler handler) {
- this.handler = handler;
+ public MessageHandlerTest() {
+ Map valueMap = new HashMap<>(2);
+ valueMap.put("key1", "value1");
+ valueMap.put("key2", "value2");
+ message = new GenericMessage<>("testPayload", valueMap);
}
+ public abstract void setUp();
- protected String getDestination() {
- return destination;
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testSend() {
+ this.handler.handleMessage(this.message);
+ verify(this.sendOperation, times(1)).sendAsync(eq(destination), isA(Message.class),
+ isA(PartitionSupplier.class));
}
- protected void setDestination(String destination) {
- this.destination = destination;
- }
+ @Test
+ public void testSendCallback() {
+ ListenableFutureCallback callbackSpy = spy(new ListenableFutureCallback() {
+ @Override
+ public void onFailure(Throwable ex) {
+ }
- protected String getDynamicDestination() {
- return dynamicDestination;
- }
+ @Override
+ public void onSuccess(Void v) {
+ }
+ });
- protected void setDynamicDestination(String dynamicDestination) {
- this.dynamicDestination = dynamicDestination;
- }
+ this.handler.setSendCallback(callbackSpy);
- protected CompletableFuture getFuture() {
- return future;
- }
+ this.handler.handleMessage(this.message);
- protected void setFuture(CompletableFuture future) {
- this.future = future;
+ verify(callbackSpy, times(1)).onSuccess(eq(null));
}
-
- public MessageHandlerTest() {
- Map valueMap = new HashMap<>(2);
- valueMap.put("key1", "value1");
- valueMap.put("key2", "value2");
- message = new GenericMessage<>("testPayload", valueMap); }
-
- @SuppressWarnings("unchecked")
@Test
- public void testSend() {
- this.handler.handleMessage(this.message);
- verify(this.sendOperation, times(1))
- .sendAsync(eq(destination), isA(Message.class), isA(PartitionSupplier.class));
- }
-
@SuppressWarnings("unchecked")
- @Test
public void testSendDynamicTopic() {
Map headers = new HashMap<>(1);
headers.put(AzureHeaders.NAME, dynamicDestination);
Message> dynamicMessage = new GenericMessage<>(payload, headers);
this.handler.handleMessage(dynamicMessage);
- verify(this.sendOperation, times(1))
- .sendAsync(eq(dynamicDestination), isA(Message.class), isA(PartitionSupplier.class));
+ verify(this.sendOperation, times(1)).sendAsync(eq(dynamicDestination), isA(Message.class),
+ isA(PartitionSupplier.class));
}
@Test
@@ -117,34 +92,39 @@ public void testSendSync() {
verify(timeout, times(1)).getValue(eq(null), eq(this.message), eq(Long.class));
}
- @SuppressWarnings("unchecked")
@Test
+ @SuppressWarnings("unchecked")
public void testSendTimeout() {
- when(this.sendOperation.sendAsync(eq(this.destination), isA(Message.class), isA(PartitionSupplier.class)))
- .thenReturn(new CompletableFuture<>());
+ when(this.sendOperation.sendAsync(eq(this.destination), isA(Message.class),
+ isA(PartitionSupplier.class))).thenReturn(Mono.empty().timeout(Mono.empty()));
this.handler.setSync(true);
this.handler.setSendTimeout(1);
- Assertions.assertThrows(MessageTimeoutException.class, () -> this.handler.handleMessage(this.message));
- }
- @Test
- public void testSendCallback() {
- ListenableFutureCallback callbackSpy = spy(new ListenableFutureCallback() {
- @Override
- public void onFailure(Throwable ex) {
+ assertThrows(MessageTimeoutException.class, () -> this.handler.handleMessage(this.message));
+ }
- }
+ public Mono getMono() {
+ return mono;
+ }
- @Override
- public void onSuccess(Void v) {
+ public void setMono(Mono mono) {
+ this.mono = mono;
+ }
- }
- });
+ public DefaultMessageHandler getHandler() {
+ return handler;
+ }
- this.handler.setSendCallback(callbackSpy);
+ public void setHandler(DefaultMessageHandler handler) {
+ this.handler = handler;
+ }
- this.handler.handleMessage(this.message);
+ public O getSendOperation() {
+ return sendOperation;
+ }
- verify(callbackSpy, times(1)).onSuccess(eq(null));
+ public void setSendOperation(O sendOperation) {
+ this.sendOperation = sendOperation;
}
+
}
diff --git a/sdk/spring/azure-spring-integration-core/src/test/java/com/azure/spring/integration/handler/reactor/MessageHandlerTest.java b/sdk/spring/azure-spring-integration-core/src/test/java/com/azure/spring/integration/handler/reactor/MessageHandlerTest.java
deleted file mode 100644
index 77bf3f0a7453f..0000000000000
--- a/sdk/spring/azure-spring-integration-core/src/test/java/com/azure/spring/integration/handler/reactor/MessageHandlerTest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-package com.azure.spring.integration.handler.reactor;
-
-import com.azure.spring.messaging.AzureHeaders;
-import com.azure.spring.messaging.PartitionSupplier;
-import com.azure.spring.messaging.core.reactor.SendOperation;
-import org.junit.jupiter.api.Test;
-import org.springframework.expression.Expression;
-import org.springframework.integration.MessageTimeoutException;
-import org.springframework.messaging.Message;
-import org.springframework.messaging.support.GenericMessage;
-import org.springframework.util.concurrent.ListenableFutureCallback;
-import reactor.core.publisher.Mono;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.isA;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-public abstract class MessageHandlerTest {
-
- protected String destination = "dest";
- protected String dynamicDestination = "dynamicName";
- protected DefaultMessageHandler handler;
- protected Mono mono = Mono.empty();
- protected O sendOperation;
- private Message> message;
- private String payload = "payload";
-
-
- public MessageHandlerTest() {
- Map valueMap = new HashMap<>(2);
- valueMap.put("key1", "value1");
- valueMap.put("key2", "value2");
- message = new GenericMessage<>("testPayload", valueMap);
- }
- public abstract void setUp();
-
- @Test
- @SuppressWarnings("unchecked")
- public void testSend() {
- this.handler.handleMessage(this.message);
- verify(this.sendOperation, times(1)).sendAsync(eq(destination), isA(Message.class),
- isA(PartitionSupplier.class));
- }
-
- @Test
- public void testSendCallback() {
- ListenableFutureCallback callbackSpy = spy(new ListenableFutureCallback() {
- @Override
- public void onFailure(Throwable ex) {
- }
-
- @Override
- public void onSuccess(Void v) {
- }
- });
-
- this.handler.setSendCallback(callbackSpy);
-
- this.handler.handleMessage(this.message);
-
- verify(callbackSpy, times(1)).onSuccess(eq(null));
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testSendDynamicTopic() {
- Map headers = new HashMap<>(1);
- headers.put(AzureHeaders.NAME, dynamicDestination);
- Message> dynamicMessage = new GenericMessage<>(payload, headers);
- this.handler.handleMessage(dynamicMessage);
- verify(this.sendOperation, times(1)).sendAsync(eq(dynamicDestination), isA(Message.class),
- isA(PartitionSupplier.class));
- }
-
- @Test
- public void testSendSync() {
- this.handler.setSync(true);
- Expression timeout = spy(this.handler.getSendTimeoutExpression());
- this.handler.setSendTimeoutExpression(timeout);
-
- this.handler.handleMessage(this.message);
- verify(timeout, times(1)).getValue(eq(null), eq(this.message), eq(Long.class));
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testSendTimeout() {
- when(this.sendOperation.sendAsync(eq(this.destination), isA(Message.class),
- isA(PartitionSupplier.class))).thenReturn(Mono.empty().timeout(Mono.empty()));
- this.handler.setSync(true);
- this.handler.setSendTimeout(1);
-
- assertThrows(MessageTimeoutException.class, () -> this.handler.handleMessage(this.message));
- }
-
- public Mono getMono() {
- return mono;
- }
-
- public void setMono(Mono mono) {
- this.mono = mono;
- }
-
- public DefaultMessageHandler getHandler() {
- return handler;
- }
-
- public void setHandler(DefaultMessageHandler handler) {
- this.handler = handler;
- }
-
- public O getSendOperation() {
- return sendOperation;
- }
-
- public void setSendOperation(O sendOperation) {
- this.sendOperation = sendOperation;
- }
-
-}
diff --git a/sdk/spring/azure-spring-integration-eventhubs/src/test/java/com/azure/spring/integration/eventhubs/outbound/EventHubMessageHandlerTest.java b/sdk/spring/azure-spring-integration-eventhubs/src/test/java/com/azure/spring/integration/eventhubs/outbound/EventHubMessageHandlerTest.java
index 941ba6e7c73a8..b42f2dea7dc3e 100644
--- a/sdk/spring/azure-spring-integration-eventhubs/src/test/java/com/azure/spring/integration/eventhubs/outbound/EventHubMessageHandlerTest.java
+++ b/sdk/spring/azure-spring-integration-eventhubs/src/test/java/com/azure/spring/integration/eventhubs/outbound/EventHubMessageHandlerTest.java
@@ -5,9 +5,9 @@
import com.azure.spring.messaging.AzureHeaders;
import com.azure.spring.messaging.PartitionSupplier;
-import com.azure.spring.integration.handler.reactor.DefaultMessageHandler;
+import com.azure.spring.integration.handler.DefaultMessageHandler;
import com.azure.spring.eventhubs.core.EventHubOperation;
-import com.azure.spring.integration.handler.reactor.MessageHandlerTest;
+import com.azure.spring.integration.handler.MessageHandlerTest;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
diff --git a/sdk/spring/azure-spring-integration-servicebus/src/test/java/com/azure/spring/integration/servicebus/outbound/ServiceBusMessageHandlerTest.java b/sdk/spring/azure-spring-integration-servicebus/src/test/java/com/azure/spring/integration/servicebus/outbound/ServiceBusMessageHandlerTest.java
index afcd8ed49edc9..8a0412008998d 100644
--- a/sdk/spring/azure-spring-integration-servicebus/src/test/java/com/azure/spring/integration/servicebus/outbound/ServiceBusMessageHandlerTest.java
+++ b/sdk/spring/azure-spring-integration-servicebus/src/test/java/com/azure/spring/integration/servicebus/outbound/ServiceBusMessageHandlerTest.java
@@ -28,13 +28,12 @@ public class ServiceBusMessageHandlerTest extends MessageHandlerTest> success;
- private final Supplier> fail;
- public AzureCheckpointer(@NonNull Supplier> success) {
+ private final Supplier> success;
+ private final Supplier> fail;
+
+ public AzureCheckpointer(@NonNull Supplier> success) {
this(success, null);
}
- public AzureCheckpointer(@NonNull Supplier> success,
- Supplier> fail) {
+ public AzureCheckpointer(@NonNull Supplier> success,
+ Supplier> fail) {
this.success = success;
this.fail = fail;
}
@Override
- public CompletableFuture success() {
+ public Mono success() {
return this.success.get();
}
@Override
- public CompletableFuture failure() {
+ public Mono failure() {
if (this.fail == null) {
throw new UnsupportedOperationException("Fail current message unsupported");
}
diff --git a/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/checkpoint/Checkpointer.java b/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/checkpoint/Checkpointer.java
index f902346828312..299c81a0ddbaa 100644
--- a/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/checkpoint/Checkpointer.java
+++ b/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/checkpoint/Checkpointer.java
@@ -3,27 +3,24 @@
package com.azure.spring.messaging.checkpoint;
-import java.util.concurrent.CompletableFuture;
+import reactor.core.publisher.Mono;
/**
* A callback to perform checkpoint.
*
- * @author Warren Zhu
+ * @author Xiaolu Dai
*/
public interface Checkpointer {
/**
* Acknowledge success of current message. Please check result to detect failure
- * @return completable future instance
+ * @return Mono Void
*/
- CompletableFuture success();
+ Mono success();
/**
* Fail current message. Please check result to detect failure
- * @return completable future instance
+ * @return Mono Void
*/
- CompletableFuture failure();
-
-
-
+ Mono failure();
}
diff --git a/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/checkpoint/reactor/AzureCheckpointer.java b/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/checkpoint/reactor/AzureCheckpointer.java
deleted file mode 100644
index 61bb8291171f1..0000000000000
--- a/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/checkpoint/reactor/AzureCheckpointer.java
+++ /dev/null
@@ -1,41 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-package com.azure.spring.messaging.checkpoint.reactor;
-
-import org.springframework.lang.NonNull;
-import reactor.core.publisher.Mono;
-
-import java.util.function.Supplier;
-
-/**
- * Azure implementation for check point callback.
- */
-public class AzureCheckpointer implements Checkpointer {
-
- private final Supplier> success;
- private final Supplier> fail;
-
- public AzureCheckpointer(@NonNull Supplier> success) {
- this(success, null);
- }
-
- public AzureCheckpointer(@NonNull Supplier> success,
- Supplier> fail) {
- this.success = success;
- this.fail = fail;
- }
-
- @Override
- public Mono success() {
- return this.success.get();
- }
-
- @Override
- public Mono failure() {
- if (this.fail == null) {
- throw new UnsupportedOperationException("Fail current message unsupported");
- }
- return this.fail.get();
- }
-}
diff --git a/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/checkpoint/reactor/Checkpointer.java b/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/checkpoint/reactor/Checkpointer.java
deleted file mode 100644
index 85339e3c4ff14..0000000000000
--- a/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/checkpoint/reactor/Checkpointer.java
+++ /dev/null
@@ -1,26 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-package com.azure.spring.messaging.checkpoint.reactor;
-
-import reactor.core.publisher.Mono;
-
-/**
- * A callback to perform checkpoint.
- *
- * @author Xiaolu Dai
- */
-public interface Checkpointer {
-
- /**
- * Acknowledge success of current message. Please check result to detect failure
- * @return Mono Void
- */
- Mono success();
-
- /**
- * Fail current message. Please check result to detect failure
- * @return Mono Void
- */
- Mono failure();
-}
diff --git a/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/checkpoint/reactor/package-info.java b/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/checkpoint/reactor/package-info.java
deleted file mode 100644
index f97093f706673..0000000000000
--- a/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/checkpoint/reactor/package-info.java
+++ /dev/null
@@ -1,7 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-/**
- * Package com.azure.spring.messaging.checkpoint.reactor;
- */
-package com.azure.spring.messaging.checkpoint.reactor;
diff --git a/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/core/BatchSendOperation.java b/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/core/BatchSendOperation.java
index 70df72f113c6f..27ca4cc4cfa5a 100644
--- a/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/core/BatchSendOperation.java
+++ b/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/core/BatchSendOperation.java
@@ -5,37 +5,58 @@
import com.azure.spring.messaging.PartitionSupplier;
import org.springframework.messaging.Message;
+import reactor.core.publisher.Mono;
import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
/**
* Operations for sending {@link Collection}<{@link Message}> to a destination.
*
- * @author Warren Zhu
+ * @author Xiaolu Dai
*/
public interface BatchSendOperation {
/**
- * Send a {@link Collection}<{@link Message}> to the given destination with a given partition supplier.
+ * Send a {@link Collection}<{@link Message}> to the given destination with a given partition supplier asynchronously.
* @param destination destination
- * @param messages message
+ * @param messages message set
* @param partitionSupplier partition supplier
- * @param payload class type in message
- * @return Future instance
+ * @param payload type in message
+ * @return Mono Void
*/
- CompletableFuture sendAsync(String destination, Collection> messages,
- PartitionSupplier partitionSupplier);
+ Mono sendAsync(String destination, Collection> messages,
+ PartitionSupplier partitionSupplier);
/**
- * Send a {@link Collection}<{@link Message}> to the given destination.
+ * Send a {@link Collection}<{@link Message}> to the given destination asynchronously.
* @param destination destination
- * @param messages messages
- * @param payload class type in message
- * @return Future instance
+ * @param messages message set
+ * @param payload type in message
+ * @return Mono Void
*/
- default CompletableFuture sendAsync(String destination, Collection> messages) {
+ default Mono sendAsync(String destination, Collection> messages) {
return sendAsync(destination, messages, null);
}
+
+ /**
+ * Send a {@link Collection}<{@link Message}> to the given destination with a given partition supplier synchronously.
+ * @param destination destination
+ * @param messages message set
+ * @param partitionSupplier partition supplier
+ * @param payload type in message
+ */
+ default void send(String destination, Collection> messages, PartitionSupplier partitionSupplier) {
+ sendAsync(destination, messages, partitionSupplier).block();
+ }
+
+ /**
+ * Send a {@link Collection}<{@link Message}> to the given destination synchronously.
+ * @param destination destination
+ * @param messages message set
+ * @param payload type in message
+ */
+ default void send(String destination, Collection> messages) {
+ send(destination, messages, null);
+ }
}
diff --git a/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/core/reactor/ReceiveOperation.java b/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/core/ReceiveOperation.java
similarity index 71%
rename from sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/core/reactor/ReceiveOperation.java
rename to sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/core/ReceiveOperation.java
index b3236092618ce..a0978466d8829 100644
--- a/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/core/reactor/ReceiveOperation.java
+++ b/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/core/ReceiveOperation.java
@@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
-package com.azure.spring.messaging.core.reactor;
+package com.azure.spring.messaging.core;
import com.azure.spring.messaging.checkpoint.CheckpointMode;
import org.springframework.messaging.Message;
@@ -17,12 +17,21 @@
public interface ReceiveOperation {
/**
- * Receive a message from destination async.
+ * Receive a message from destination asynchronously.
* @param destination destination
* @return {@link Mono} of the next available {@link Message} or {@code null} if empty
*/
Mono> receiveAsync(String destination);
+ /**
+ * Receive a message from destination synchronously.
+ * @param destination destination
+ * @return received {@link Message} or {@code null} if empty
+ */
+ default Object receive(String destination) {
+ return receiveAsync(destination).block();
+ }
+
/**
* Set message payload type. Default is {@code byte[]}
* @param messagePayloadType message payload type
diff --git a/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/core/RxSendOperation.java b/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/core/RxSendOperation.java
deleted file mode 100644
index c507c67a1e01d..0000000000000
--- a/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/core/RxSendOperation.java
+++ /dev/null
@@ -1,37 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-package com.azure.spring.messaging.core;
-
-import com.azure.spring.messaging.PartitionSupplier;
-import org.springframework.messaging.Message;
-import rx.Observable;
-
-/**
- * Operations for sending {@link Message} to a destination in a reactive way.
- *
- * @author Warren Zhu
- */
-public interface RxSendOperation {
-
- /**
- * Send a {@link Message} to the given destination with a given partition supplier.
- * @param destination destination
- * @param message message
- * @param partitionSupplier partition supplier
- * @param payload type in message
- * @return observable instance
- */
- Observable sendRx(String destination, Message message, PartitionSupplier partitionSupplier);
-
- /**
- * Send a {@link Message} to the given destination.
- * @param destination destination
- * @param message message
- * @param payload type in message
- * @return observable instance
- */
- default Observable sendRx(String destination, Message message) {
- return sendRx(destination, message, null);
- }
-}
diff --git a/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/core/RxSubscribeByGroupOperation.java b/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/core/RxSubscribeByGroupOperation.java
deleted file mode 100644
index 74825132ea80d..0000000000000
--- a/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/core/RxSubscribeByGroupOperation.java
+++ /dev/null
@@ -1,26 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-package com.azure.spring.messaging.core;
-
-import com.azure.spring.messaging.checkpoint.Checkpointable;
-import org.springframework.messaging.Message;
-import rx.Observable;
-
-/**
- * Operations for subscribing to a destination with a consumer group in a reactive way.
- *
- * @author Warren Zhu
- */
-public interface RxSubscribeByGroupOperation extends Checkpointable {
-
- /**
- * Register a message consumer to a given destination with a given consumer group.
- * @param destination destination
- * @param consumerGroup consumer group
- * @param messagePayloadType message payload type
- * @return {@code Observable>}
- */
- Observable> subscribe(String destination, String consumerGroup,
- Class> messagePayloadType);
-}
diff --git a/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/core/RxSubscribeOperation.java b/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/core/RxSubscribeOperation.java
deleted file mode 100644
index 0ba4075782e8e..0000000000000
--- a/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/core/RxSubscribeOperation.java
+++ /dev/null
@@ -1,26 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-package com.azure.spring.messaging.core;
-
-import com.azure.spring.messaging.checkpoint.CheckpointMode;
-import org.springframework.messaging.Message;
-import rx.Observable;
-
-/**
- * Operations for subscribing to a destination in reactive way.
- *
- * @author Warren Zhu
- */
-public interface RxSubscribeOperation {
-
- /**
- * Register a message consumer to a given destination.
- * @param destination destination
- * @param messagePayloadType message payload type
- * @return {@code Observable>}
- */
- Observable> subscribe(String destination, Class> messagePayloadType);
-
- void setCheckpointMode(CheckpointMode checkpointMode);
-}
diff --git a/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/core/SendOperation.java b/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/core/SendOperation.java
index 484b71030354d..281011bda8086 100644
--- a/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/core/SendOperation.java
+++ b/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/core/SendOperation.java
@@ -5,35 +5,55 @@
import com.azure.spring.messaging.PartitionSupplier;
import org.springframework.messaging.Message;
-
-import java.util.concurrent.CompletableFuture;
+import reactor.core.publisher.Mono;
/**
* Operations for sending {@link Message} to a destination.
*
- * @author Warren Zhu
+ * @author Xiaolu Dai
*/
public interface SendOperation {
/**
- * Send a {@link Message} to the given destination with a given partition supplier.
+ * Send a {@link Message} to the given destination with a given partition supplier asynchronously.
* @param destination destination
* @param message message
* @param partitionSupplier partition supplier
- * @param payload type in message
- * @return future instance
+ * @param payload class in message
+ * @return Mono Void
*/
- CompletableFuture sendAsync(String destination, Message message, PartitionSupplier partitionSupplier);
+ Mono sendAsync(String destination, Message message, PartitionSupplier partitionSupplier);
/**
- * Send a {@link Message} to the given destination.
+ * Send a {@link Message} to the given destination asynchronously.
* @param destination destination
* @param message message
- * @param payload type in message
- * @return future instance
+ * @param payload class in message
+ * @return Mono Void
*/
- default CompletableFuture sendAsync(String destination, Message message) {
+ default Mono sendAsync(String destination, Message message) {
return sendAsync(destination, message, null);
}
+
+ /**
+ * Send a {@link Message} to the given destination with a given partition supplier synchronously.
+ * @param destination destination
+ * @param message message
+ * @param partitionSupplier partition supplier
+ * @param payload class in message
+ */
+ default void send(String destination, Message message, PartitionSupplier partitionSupplier) {
+ sendAsync(destination, message, partitionSupplier).block();
+ }
+
+ /**
+ * Send a {@link Message} to the given destination synchronously.
+ * @param destination destination
+ * @param message message
+ * @param payload class in message
+ */
+ default void send(String destination, Message message) {
+ send(destination, message, null);
+ }
}
diff --git a/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/core/reactor/BatchSendOperation.java b/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/core/reactor/BatchSendOperation.java
deleted file mode 100644
index ec0d9052951ff..0000000000000
--- a/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/core/reactor/BatchSendOperation.java
+++ /dev/null
@@ -1,41 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-package com.azure.spring.messaging.core.reactor;
-
-import com.azure.spring.messaging.PartitionSupplier;
-import org.springframework.messaging.Message;
-import reactor.core.publisher.Mono;
-
-import java.util.Collection;
-
-
-/**
- * Operations for sending {@link Collection}<{@link Message}> to a destination.
- *
- * @author Xiaolu Dai
- */
-public interface BatchSendOperation {
-
- /**
- * Send a {@link Collection}<{@link Message}> to the given destination with a given partition supplier.
- * @param destination destination
- * @param messages message set
- * @param partitionSupplier partition supplier
- * @param payload type in message
- * @return Mono Void
- */
- Mono sendAsync(String destination, Collection> messages,
- PartitionSupplier partitionSupplier);
-
- /**
- * Send a {@link Collection}<{@link Message}> to the given destination.
- * @param destination destination
- * @param messages message set
- * @param payload type in message
- * @return Mono Void
- */
- default Mono sendAsync(String destination, Collection> messages) {
- return sendAsync(destination, messages, null);
- }
-}
diff --git a/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/core/reactor/SendOperation.java b/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/core/reactor/SendOperation.java
deleted file mode 100644
index 4a1325f3c555b..0000000000000
--- a/sdk/spring/azure-spring-messaging/src/main/java/com/azure/spring/messaging/core/reactor/SendOperation.java
+++ /dev/null
@@ -1,38 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-package com.azure.spring.messaging.core.reactor;
-
-import com.azure.spring.messaging.PartitionSupplier;
-import org.springframework.messaging.Message;
-import reactor.core.publisher.Mono;
-
-
-/**
- * Operations for sending {@link Message} to a destination.
- *
- * @author Xiaolu Dai
- */
-public interface SendOperation {
-
- /**
- * Send a {@link Message} to the given destination with a given partition supplier.
- * @param destination destination
- * @param message message
- * @param partitionSupplier partition supplier
- * @param payload class in message
- * @return Mono Void
- */
- Mono sendAsync(String destination, Message message, PartitionSupplier partitionSupplier);
-
- /**
- * Send a {@link Message} to the given destination.
- * @param destination destination
- * @param message message
- * @param payload class in message
- * @return Mono Void
- */
- default Mono sendAsync(String destination, Message message) {
- return sendAsync(destination, message, null);
- }
-}
diff --git a/sdk/spring/azure-spring-messaging/src/test/java/com/azure/spring/messaging/core/SendOperationTest.java b/sdk/spring/azure-spring-messaging/src/test/java/com/azure/spring/messaging/core/SendOperationTest.java
index 399dfd4c473fd..c17d8c08d7252 100644
--- a/sdk/spring/azure-spring-messaging/src/test/java/com/azure/spring/messaging/core/SendOperationTest.java
+++ b/sdk/spring/azure-spring-messaging/src/test/java/com/azure/spring/messaging/core/SendOperationTest.java
@@ -3,8 +3,6 @@
package com.azure.spring.messaging.core;
-import com.azure.spring.messaging.PartitionSupplier;
-import org.assertj.core.api.Fail;
import org.junit.jupiter.api.Test;
import org.springframework.core.NestedRuntimeException;
import org.springframework.messaging.Message;
@@ -13,22 +11,20 @@
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
+import static org.assertj.core.api.Fail.fail;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
public abstract class SendOperationTest {
+ protected String consumerGroup = "consumer-group";
protected String destination = "event-hub";
protected Message> message;
protected Mono mono = Mono.empty();
- protected String partitionKey = "key";
protected String payload = "payload";
- protected O sendOperation = null;
- private String partitionId = "1";
+ protected O sendOperation;
public SendOperationTest() {
Map valueMap = new HashMap<>(2);
@@ -39,105 +35,50 @@ public SendOperationTest() {
protected abstract void setupError(String errorMessage);
- @Test
- public void testSendCreateSenderFailure() throws Throwable {
- whenSendWithException();
-
- assertThrows(NestedRuntimeException.class,
- () -> this.sendOperation.sendAsync(destination, this.message, null).get());
- }
@Test
- public void testSendFailure() {
- setupError("future failed.");
- CompletableFuture future = this.sendOperation.sendAsync(destination, this.message, null);
-
- try {
- future.get();
- Fail.fail("Test should fail.");
- } catch (InterruptedException ie) {
- Fail.fail("get() should fail with an ExecutionException.");
- } catch (ExecutionException ee) {
- assertEquals("future failed.", ee.getCause().getMessage());
- }
- }
-
- @Test
- public void testSendWithPartitionId() throws ExecutionException, InterruptedException {
- PartitionSupplier partitionSupplier = new PartitionSupplier();
- partitionSupplier.setPartitionId(partitionId);
- CompletableFuture future = this.sendOperation.sendAsync(destination, message, partitionSupplier);
-
- assertNull(future.get());
- verifySendWithPartitionId(1);
- verifyPartitionSenderCalled(1);
- }
-
- @Test
- public void testSendWithPartitionKey() throws ExecutionException, InterruptedException {
- PartitionSupplier partitionSupplier = new PartitionSupplier();
- partitionSupplier.setPartitionKey(partitionKey);
- CompletableFuture future = this.sendOperation.sendAsync(destination, message, partitionSupplier);
-
- assertNull(future.get());
- verifySendWithPartitionKey(1);
- verifyGetClientCreator(1);
- }
-
- @Test
- public void testSendWithSessionId() throws ExecutionException, InterruptedException {
- Map valueMap = new HashMap<>();
- valueMap.put("key1", "value1");
- valueMap.put("key2", "value2");
- valueMap.put("azure_service_bus_session_id", "TestSessionId");
- Message> messageWithSeesionId = new GenericMessage<>("testPayload", valueMap);
- CompletableFuture future = this.sendOperation.sendAsync(destination, messageWithSeesionId);
+ public void testSend() {
+ final Mono mono = this.sendOperation.sendAsync(destination, message, null);
- assertNull(future.get());
+ assertNull(mono.block());
verifySendCalled(1);
}
@Test
- public void testSendWithSessionIdAndPartitionKeyDifferent() throws ExecutionException, InterruptedException {
- Map valueMap = new HashMap<>();
- valueMap.put("key1", "value1");
- valueMap.put("key2", "value2");
- valueMap.put("azure_service_bus_session_id", "TestSessionId");
- valueMap.put("azure_service_bus_partition_key", "TestPartitionKey");
- Message> messageWithSeesionIdAndPartitionKey = new GenericMessage<>("testPayload", valueMap);
- CompletableFuture future = this.sendOperation.sendAsync(destination, messageWithSeesionIdAndPartitionKey);
-
- assertNull(future.get());
- verifySendCalled(1);
- }
-
- @Test
- public void testSendWithoutPartition() throws ExecutionException, InterruptedException {
- CompletableFuture future = this.sendOperation.sendAsync(destination, message, new PartitionSupplier());
+ public void testSendCreateSenderFailure() {
+ whenSendWithException();
- assertNull(future.get());
- verifySendCalled(1);
+ assertThrows(NestedRuntimeException.class, () -> this.sendOperation.sendAsync(destination, this.message,
+ null).block());
}
@Test
- public void testSendWithoutPartitionSupplier() throws ExecutionException, InterruptedException {
- CompletableFuture future = this.sendOperation.sendAsync(destination, message, null);
+ public void testSendFailure() {
+ String errorMessage = "Send failed.";
+ setupError(errorMessage);
+ Mono mono = this.sendOperation.sendAsync(destination, this.message, null);
- assertNull(future.get());
- verifySendCalled(1);
+ try {
+ mono.block();
+ fail("Test should fail.");
+ } catch (Exception e) {
+ assertEquals(errorMessage, e.getMessage());
+ }
}
protected abstract void verifyGetClientCreator(int times);
- protected abstract void verifyPartitionSenderCalled(int times);
-
protected abstract void verifySendCalled(int times);
- protected abstract void verifySendWithPartitionId(int times);
+ protected abstract void whenSendWithException();
- protected abstract void verifySendWithPartitionKey(int times);
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
- protected abstract void whenSendWithException();
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
public Mono getMono() {
return mono;
diff --git a/sdk/spring/azure-spring-messaging/src/test/java/com/azure/spring/messaging/core/reactor/SendSubscribeByGroupOperationTest.java b/sdk/spring/azure-spring-messaging/src/test/java/com/azure/spring/messaging/core/SendSubscribeByGroupOperationTest.java
similarity index 87%
rename from sdk/spring/azure-spring-messaging/src/test/java/com/azure/spring/messaging/core/reactor/SendSubscribeByGroupOperationTest.java
rename to sdk/spring/azure-spring-messaging/src/test/java/com/azure/spring/messaging/core/SendSubscribeByGroupOperationTest.java
index d11ab8e93699c..23d22a95a33ca 100644
--- a/sdk/spring/azure-spring-messaging/src/test/java/com/azure/spring/messaging/core/reactor/SendSubscribeByGroupOperationTest.java
+++ b/sdk/spring/azure-spring-messaging/src/test/java/com/azure/spring/messaging/core/SendSubscribeByGroupOperationTest.java
@@ -1,10 +1,9 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
-package com.azure.spring.messaging.core.reactor;
+package com.azure.spring.messaging.core;
import com.azure.spring.messaging.checkpoint.CheckpointConfig;
-import com.azure.spring.messaging.core.SubscribeByGroupOperation;
import org.springframework.messaging.Message;
import java.util.function.Consumer;
diff --git a/sdk/spring/azure-spring-messaging/src/test/java/com/azure/spring/messaging/core/SendSubscribeOperationTest.java b/sdk/spring/azure-spring-messaging/src/test/java/com/azure/spring/messaging/core/SendSubscribeOperationTest.java
index 2787b3f39187e..f68dcc2168edd 100644
--- a/sdk/spring/azure-spring-messaging/src/test/java/com/azure/spring/messaging/core/SendSubscribeOperationTest.java
+++ b/sdk/spring/azure-spring-messaging/src/test/java/com/azure/spring/messaging/core/SendSubscribeOperationTest.java
@@ -25,33 +25,30 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public abstract class SendSubscribeOperationTest {
- protected T sendSubscribeOperation;
+public abstract class SendSubscribeOperationTest {
- protected String partitionId = "1";
protected String destination = "test";
- protected String payload = "payload";
- protected User user = new User(payload);
- protected Map headers = new HashMap<>();
- protected Message userMessage = new GenericMessage<>(user, headers);
-
+ protected String partitionId = "1";
+ protected T sendSubscribeOperation;
+ private Map headers = new HashMap<>();
protected List> messages = IntStream.range(1, 5)
.mapToObj(String::valueOf)
.map(User::new)
.map(u -> new GenericMessage<>(u, headers))
.collect(Collectors.toList());
- private final Message stringMessage = new GenericMessage<>(payload, headers);
- private final Message byteMessage = new GenericMessage<>(payload.getBytes(StandardCharsets.UTF_8), headers);
+ private String payload = "payload";
+ private Message byteMessage = new GenericMessage<>(payload.getBytes(StandardCharsets.UTF_8), headers);
+ private Message stringMessage = new GenericMessage<>(payload, headers);
+ protected User user = new User(payload);
+ protected Message userMessage = new GenericMessage<>(user, headers);
+
+ protected abstract void setCheckpointConfig(CheckpointConfig checkpointConfig);
@BeforeEach
public abstract void setUp() throws Exception;
- @Test
- public void testSendString() {
- subscribe(destination, this::stringHandler, String.class);
- sendSubscribeOperation.sendAsync(destination, stringMessage);
- }
+ protected abstract void subscribe(String destination, Consumer> consumer, Class> payloadType);
@Test
public void testSendByte() {
@@ -59,12 +56,6 @@ public void testSendByte() {
sendSubscribeOperation.sendAsync(destination, byteMessage);
}
- @Test
- public void testSendUser() {
- subscribe(destination, this::userHandler, User.class);
- sendSubscribeOperation.sendAsync(destination, userMessage);
- }
-
@Test
public void testSendReceiveWithManualCheckpointMode() {
setCheckpointConfig(CheckpointConfig.builder().checkpointMode(CheckpointMode.MANUAL).build());
@@ -80,6 +71,39 @@ public void testSendReceiveWithRecordCheckpointMode() {
verifyCheckpointSuccessCalled(messages.size());
}
+ @Test
+ public void testSendString() {
+ subscribe(destination, this::stringHandler, String.class);
+ sendSubscribeOperation.sendAsync(destination, stringMessage);
+ }
+
+ @Test
+ public void testSendUser() {
+ subscribe(destination, this::userHandler, User.class);
+ sendSubscribeOperation.sendAsync(destination, userMessage);
+ }
+
+ @Deprecated
+ protected abstract void verifyCheckpointBatchSuccessCalled(int times);
+
+ protected void verifyCheckpointFailure(Checkpointer checkpointer) {
+ checkpointer.failure();
+ verifyCheckpointFailureCalled(1);
+ }
+
+ protected abstract void verifyCheckpointFailureCalled(int times);
+
+ protected void verifyCheckpointSuccess(Checkpointer checkpointer) {
+ checkpointer.success();
+ verifyCheckpointSuccessCalled(1);
+ }
+
+ protected abstract void verifyCheckpointSuccessCalled(int times);
+
+ private void byteHandler(Message> message) {
+ assertEquals(payload, new String((byte[]) message.getPayload(), StandardCharsets.UTF_8));
+ }
+
protected void manualCheckpointHandler(Message> message) {
assertTrue(message.getHeaders().containsKey(AzureHeaders.CHECKPOINTER));
Checkpointer checkpointer = message.getHeaders().get(AzureHeaders.CHECKPOINTER, Checkpointer.class);
@@ -96,32 +120,16 @@ private void stringHandler(Message> message) {
assertEquals(payload, message.getPayload());
}
- private void byteHandler(Message> message) {
- assertEquals(payload, new String((byte[]) message.getPayload(), StandardCharsets.UTF_8));
- }
-
private void userHandler(Message> message) {
assertEquals(user, message.getPayload());
}
- protected abstract void verifyCheckpointSuccessCalled(int times);
-
- protected abstract void verifyCheckpointBatchSuccessCalled(int times);
-
- protected abstract void verifyCheckpointFailureCalled(int times);
-
- protected abstract void subscribe(String destination, Consumer> consumer, Class> payloadType);
-
- protected abstract void setCheckpointConfig(CheckpointConfig checkpointConfig);
-
- protected void verifyCheckpointSuccess(Checkpointer checkpointer) {
- checkpointer.success();
- verifyCheckpointSuccessCalled(1);
+ public T getSendSubscribeOperation() {
+ return sendSubscribeOperation;
}
- protected void verifyCheckpointFailure(Checkpointer checkpointer) {
- checkpointer.failure();
- verifyCheckpointFailureCalled(1);
+ public void setSendSubscribeOperation(T sendSubscribeOperation) {
+ this.sendSubscribeOperation = sendSubscribeOperation;
}
public String getPartitionId() {
@@ -132,14 +140,6 @@ public void setPartitionId(String partitionId) {
this.partitionId = partitionId;
}
- public T getSendSubscribeOperation() {
- return sendSubscribeOperation;
- }
-
- public void setSendSubscribeOperation(T sendSubscribeOperation) {
- this.sendSubscribeOperation = sendSubscribeOperation;
- }
-
protected void waitMillis(long millis) {
if (millis <= 0) {
millis = 30;
diff --git a/sdk/spring/azure-spring-messaging/src/test/java/com/azure/spring/messaging/core/reactor/SendOperationTest.java b/sdk/spring/azure-spring-messaging/src/test/java/com/azure/spring/messaging/core/reactor/SendOperationTest.java
deleted file mode 100644
index ba9439791226c..0000000000000
--- a/sdk/spring/azure-spring-messaging/src/test/java/com/azure/spring/messaging/core/reactor/SendOperationTest.java
+++ /dev/null
@@ -1,106 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-package com.azure.spring.messaging.core.reactor;
-
-import org.junit.jupiter.api.Test;
-import org.springframework.core.NestedRuntimeException;
-import org.springframework.messaging.Message;
-import org.springframework.messaging.support.GenericMessage;
-import reactor.core.publisher.Mono;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.assertj.core.api.Fail.fail;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-public abstract class SendOperationTest {
-
- protected String consumerGroup = "consumer-group";
- protected String destination = "event-hub";
- protected Message> message;
- protected Mono mono = Mono.empty();
- protected String payload = "payload";
- protected O sendOperation;
-
- public SendOperationTest() {
- Map valueMap = new HashMap<>(2);
- valueMap.put("key1", "value1");
- valueMap.put("key2", "value2");
- message = new GenericMessage<>("testPayload", valueMap);
- }
-
- protected abstract void setupError(String errorMessage);
-
-
- @Test
- public void testSend() {
- final Mono mono = this.sendOperation.sendAsync(destination, message, null);
-
- assertNull(mono.block());
- verifySendCalled(1);
- }
-
- @Test
- public void testSendCreateSenderFailure() {
- whenSendWithException();
-
- assertThrows(NestedRuntimeException.class, () -> this.sendOperation.sendAsync(destination, this.message,
- null).block());
- }
-
- @Test
- public void testSendFailure() {
- String errorMessage = "Send failed.";
- setupError(errorMessage);
- Mono mono = this.sendOperation.sendAsync(destination, this.message, null);
-
- try {
- mono.block();
- fail("Test should fail.");
- } catch (Exception e) {
- assertEquals(errorMessage, e.getMessage());
- }
- }
-
- protected abstract void verifyGetClientCreator(int times);
-
- protected abstract void verifySendCalled(int times);
-
- protected abstract void whenSendWithException();
-
- public String getConsumerGroup() {
- return consumerGroup;
- }
-
- public void setConsumerGroup(String consumerGroup) {
- this.consumerGroup = consumerGroup;
- }
-
- public Mono getMono() {
- return mono;
- }
-
- public void setMono(Mono mono) {
- this.mono = mono;
- }
-
- public String getPayload() {
- return payload;
- }
-
- public void setPayload(String payload) {
- this.payload = payload;
- }
-
- public O getSendOperation() {
- return sendOperation;
- }
-
- public void setSendOperation(O sendOperation) {
- this.sendOperation = sendOperation;
- }
-}
diff --git a/sdk/spring/azure-spring-messaging/src/test/java/com/azure/spring/messaging/core/reactor/SendSubscribeOperationTest.java b/sdk/spring/azure-spring-messaging/src/test/java/com/azure/spring/messaging/core/reactor/SendSubscribeOperationTest.java
deleted file mode 100644
index 3078c9f30d5c5..0000000000000
--- a/sdk/spring/azure-spring-messaging/src/test/java/com/azure/spring/messaging/core/reactor/SendSubscribeOperationTest.java
+++ /dev/null
@@ -1,142 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-package com.azure.spring.messaging.core.reactor;
-
-import com.azure.spring.messaging.AzureHeaders;
-import com.azure.spring.messaging.checkpoint.CheckpointConfig;
-import com.azure.spring.messaging.checkpoint.CheckpointMode;
-import com.azure.spring.messaging.checkpoint.reactor.Checkpointer;
-import com.azure.spring.messaging.support.pojo.User;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.springframework.messaging.Message;
-import org.springframework.messaging.support.GenericMessage;
-
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-
-public abstract class SendSubscribeOperationTest {
-
- protected String destination = "test";
- protected String partitionId = "1";
- protected T sendSubscribeOperation;
- private Map headers = new HashMap<>();
- protected List> messages = IntStream.range(1, 5)
- .mapToObj(String::valueOf)
- .map(User::new)
- .map(u -> new GenericMessage<>(u, headers))
- .collect(Collectors.toList());
- private String payload = "payload";
- private Message byteMessage = new GenericMessage<>(payload.getBytes(StandardCharsets.UTF_8), headers);
- private Message stringMessage = new GenericMessage<>(payload, headers);
- private User user = new User(payload);
- protected Message userMessage = new GenericMessage<>(user, headers);
-
- protected abstract void setCheckpointConfig(CheckpointConfig checkpointConfig);
-
- @BeforeEach
- public abstract void setUp();
-
- protected abstract void subscribe(String destination, Consumer> consumer, Class> payloadType);
-
- @Test
- public void testSendByte() {
- subscribe(destination, this::byteHandler, byte[].class);
- sendSubscribeOperation.sendAsync(destination, byteMessage);
- }
-
- @Test
- public void testSendReceiveWithManualCheckpointMode() {
- setCheckpointConfig(CheckpointConfig.builder().checkpointMode(CheckpointMode.MANUAL).build());
- subscribe(destination, this::manualCheckpointHandler, User.class);
- sendSubscribeOperation.sendAsync(destination, userMessage);
- }
-
- @Test
- public void testSendReceiveWithRecordCheckpointMode() {
- setCheckpointConfig(CheckpointConfig.builder().checkpointMode(CheckpointMode.RECORD).build());
- subscribe(destination, this::recordCheckpointHandler, User.class);
- messages.forEach(m -> sendSubscribeOperation.sendAsync(destination, m));
- verifyCheckpointSuccessCalled(messages.size());
- }
-
- @Test
- public void testSendString() {
- subscribe(destination, this::stringHandler, String.class);
- sendSubscribeOperation.sendAsync(destination, stringMessage);
- }
-
- @Test
- public void testSendUser() {
- subscribe(destination, this::userHandler, User.class);
- sendSubscribeOperation.sendAsync(destination, userMessage);
- }
-
- @Deprecated
- protected abstract void verifyCheckpointBatchSuccessCalled(int times);
-
- protected void verifyCheckpointFailure(Checkpointer checkpointer) {
- checkpointer.failure();
- verifyCheckpointFailureCalled(1);
- }
-
- protected abstract void verifyCheckpointFailureCalled(int times);
-
- protected void verifyCheckpointSuccess(Checkpointer checkpointer) {
- checkpointer.success();
- verifyCheckpointSuccessCalled(1);
- }
-
- protected abstract void verifyCheckpointSuccessCalled(int times);
-
- private void byteHandler(Message> message) {
- assertEquals(payload, new String((byte[]) message.getPayload(), StandardCharsets.UTF_8));
- }
-
- private void manualCheckpointHandler(Message> message) {
- assertTrue(message.getHeaders().containsKey(AzureHeaders.CHECKPOINTER));
- Checkpointer checkpointer = message.getHeaders().get(AzureHeaders.CHECKPOINTER, Checkpointer.class);
- assertNotNull(checkpointer);
- verifyCheckpointSuccess(checkpointer);
- verifyCheckpointFailure(checkpointer);
- }
-
- private void recordCheckpointHandler(Message> message) {
- //
- }
-
- private void stringHandler(Message> message) {
- assertEquals(payload, message.getPayload());
- }
-
- private void userHandler(Message> message) {
- assertEquals(user, message.getPayload());
- }
-
- public T getSendSubscribeOperation() {
- return sendSubscribeOperation;
- }
-
- public void setSendSubscribeOperation(T sendSubscribeOperation) {
- this.sendSubscribeOperation = sendSubscribeOperation;
- }
-
- public String getPartitionId() {
- return partitionId;
- }
-
- public void setPartitionId(String partitionId) {
- this.partitionId = partitionId;
- }
-}
diff --git a/sdk/spring/azure-spring-messaging/src/test/java/com/azure/spring/messaging/core/rx/RxSendSubscribeByGroupOperationTest.java b/sdk/spring/azure-spring-messaging/src/test/java/com/azure/spring/messaging/core/rx/RxSendSubscribeByGroupOperationTest.java
deleted file mode 100644
index fe0eb21a33acd..0000000000000
--- a/sdk/spring/azure-spring-messaging/src/test/java/com/azure/spring/messaging/core/rx/RxSendSubscribeByGroupOperationTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-package com.azure.spring.messaging.core.rx;
-
-import com.azure.spring.messaging.checkpoint.CheckpointConfig;
-import com.azure.spring.messaging.core.RxSendOperation;
-import com.azure.spring.messaging.core.RxSubscribeByGroupOperation;
-import org.springframework.messaging.Message;
-import rx.Observable;
-
-public abstract class RxSendSubscribeByGroupOperationTest
- extends RxSendSubscribeOperationTest {
-
- protected String consumerGroup = "group1";
-
- @Override
- protected Observable> subscribe(String destination, Class> payloadType) {
- return sendSubscribeOperation.subscribe(destination, consumerGroup, payloadType);
- }
-
- @Override
- protected void setCheckpointConfig(CheckpointConfig checkpointConfig) {
- sendSubscribeOperation.setCheckpointConfig(checkpointConfig);
- }
-
- public String getConsumerGroup() {
- return consumerGroup;
- }
-
- public void setConsumerGroup(String consumerGroup) {
- this.consumerGroup = consumerGroup;
- }
-
-}
diff --git a/sdk/spring/azure-spring-messaging/src/test/java/com/azure/spring/messaging/core/rx/RxSendSubscribeOperationTest.java b/sdk/spring/azure-spring-messaging/src/test/java/com/azure/spring/messaging/core/rx/RxSendSubscribeOperationTest.java
deleted file mode 100644
index d41d1d18daf95..0000000000000
--- a/sdk/spring/azure-spring-messaging/src/test/java/com/azure/spring/messaging/core/rx/RxSendSubscribeOperationTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-package com.azure.spring.messaging.core.rx;
-
-import com.azure.spring.messaging.checkpoint.CheckpointConfig;
-import com.azure.spring.messaging.checkpoint.CheckpointMode;
-import com.azure.spring.messaging.core.RxSendOperation;
-import com.azure.spring.messaging.support.pojo.User;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.springframework.messaging.Message;
-import org.springframework.messaging.support.GenericMessage;
-import rx.Observable;
-import rx.observers.AssertableSubscriber;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.stream.IntStream;
-
-public abstract class RxSendSubscribeOperationTest {
-
- protected String destination = "test";
- protected String partitionId = "1";
- protected T sendSubscribeOperation;
- private Map headers = new HashMap<>();
- @SuppressWarnings("unchecked")
- protected Message[] messages = IntStream.range(1, 5)
- .mapToObj(String::valueOf)
- .map(User::new)
- .map(u -> new GenericMessage<>(u, headers))
- .toArray(Message[]::new);
- private String payload = "payload";
- private Message byteMessage = new GenericMessage<>(payload.getBytes(StandardCharsets.UTF_8), headers);
- private Message stringMessage = new GenericMessage<>(payload, headers);
- private User user = new User(payload);
- protected Message userMessage = new GenericMessage<>(user, headers);
-
- protected abstract void setCheckpointConfig(CheckpointConfig checkpointConfig);
-
- @BeforeEach
- public abstract void setUp();
-
- protected abstract Observable> subscribe(String destination, Class> payloadType);
-
- @Test
- public void testSendByte() {
- AssertableSubscriber subscriber = subscribe(destination, byte[].class).map(Message::getPayload)
- .cast(byte[].class)
- .map(String::new)
- .test();
- sendSubscribeOperation.sendRx(destination, byteMessage);
- subscriber.assertValue(payload).assertNoErrors();
- }
-
- @Test
- public void testSendReceiveWithManualCheckpointMode() {
- setCheckpointConfig(CheckpointConfig.builder().checkpointMode(CheckpointMode.MANUAL).build());
-
- Observable> observable = subscribe(destination, User.class);
- AssertableSubscriber userSubscriber = observable.map(Message::getPayload).cast(User.class).test();
- sendSubscribeOperation.sendRx(destination, userMessage);
- userSubscriber.assertValue(user).assertNoErrors();
- verifyCheckpointSuccessCalled(0);
- }
-
- @Test
- public void testSendReceiveWithRecordCheckpointMode() {
- setCheckpointConfig(CheckpointConfig.builder().checkpointMode(CheckpointMode.RECORD).build());
- AssertableSubscriber subscriber = subscribe(destination, User.class).map(Message::getPayload)
- .cast(User.class)
- .test();
- Arrays.stream(messages).forEach(m -> sendSubscribeOperation.sendRx(destination, m));
- subscriber.assertValueCount(messages.length).assertNoErrors();
- verifyCheckpointSuccessCalled(messages.length);
- }
-
- @Test
- public void testSendString() {
- AssertableSubscriber subscriber = subscribe(destination, String.class).map(Message::getPayload)
- .cast(String.class)
- .test();
- sendSubscribeOperation.sendRx(destination, stringMessage);
- subscriber.assertValue(payload).assertNoErrors();
- }
-
- @Test
- public void testSendUser() {
- AssertableSubscriber subscriber = subscribe(destination, User.class).map(Message::getPayload)
- .cast(User.class)
- .test();
- sendSubscribeOperation.sendRx(destination, userMessage);
- subscriber.assertValue(user).assertNoErrors();
- }
-
- protected abstract void verifyCheckpointBatchSuccessCalled(int times);
-
- protected abstract void verifyCheckpointSuccessCalled(int times);
-
- public String getPartitionId() {
- return partitionId;
- }
-
- public void setPartitionId(String partitionId) {
- this.partitionId = partitionId;
- }
-
- public T getSendSubscribeOperation() {
- return sendSubscribeOperation;
- }
-
- public void setSendSubscribeOperation(T sendSubscribeOperation) {
- this.sendSubscribeOperation = sendSubscribeOperation;
- }
-}
diff --git a/sdk/spring/azure-spring-servicebus/pom.xml b/sdk/spring/azure-spring-servicebus/pom.xml
index 844087ec6a6b0..b73b28306a434 100644
--- a/sdk/spring/azure-spring-servicebus/pom.xml
+++ b/sdk/spring/azure-spring-servicebus/pom.xml
@@ -41,6 +41,13 @@
azure-messaging-servicebus
7.4.1