From c580ffd0538d896189ec7dca4ecfc281e82c4913 Mon Sep 17 00:00:00 2001 From: Anu Thomas Chandy Date: Wed, 15 Jun 2022 16:19:51 -0700 Subject: [PATCH] [Amqp] Moving tests using VirtualTimeScheduler to dedicated test classes to run them in sequential and isolated mode (#29492) * Creating dedicated test class for tests using VirtualTimeScheduler for running them in sequential/isolated * Adding license header * Isolating ReactorReceiver test using VTS * fixing checkstyle --- .../AmqpChannelProcessorIsolatedTest.java | 147 +++++++++++++++ .../AmqpChannelProcessorTest.java | 61 ------- .../ReactorReceiverIsolatedTest.java | 172 ++++++++++++++++++ .../implementation/ReactorReceiverTest.java | 49 ----- .../implementation/ReactorSenderTest.java | 2 - .../implementation/RetryUtilIsolatedTest.java | 113 ++++++++++++ .../amqp/implementation/RetryUtilTest.java | 77 -------- 7 files changed, 432 insertions(+), 189 deletions(-) create mode 100644 sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/AmqpChannelProcessorIsolatedTest.java create mode 100644 sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorReceiverIsolatedTest.java create mode 100644 sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RetryUtilIsolatedTest.java diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/AmqpChannelProcessorIsolatedTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/AmqpChannelProcessorIsolatedTest.java new file mode 100644 index 0000000000000..fa4f2fa097429 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/AmqpChannelProcessorIsolatedTest.java @@ -0,0 +1,147 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.implementation; + +import com.azure.core.amqp.AmqpEndpointState; +import com.azure.core.amqp.AmqpRetryPolicy; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.junit.jupiter.api.parallel.Isolated; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; +import reactor.test.publisher.TestPublisher; +import reactor.test.scheduler.VirtualTimeScheduler; + +import java.time.Duration; +import java.util.HashMap; + +/** + * Tests for {@link AmqpChannelProcessor} using + * {@link reactor.test.scheduler.VirtualTimeScheduler} hence needs + * to run in isolated and sequential. + */ +@Execution(ExecutionMode.SAME_THREAD) +@Isolated +public class AmqpChannelProcessorIsolatedTest { + private static final Duration VERIFY_TIMEOUT = Duration.ofSeconds(30); + private final TestObject connection1 = new TestObject(); + + @Mock + private AmqpRetryPolicy retryPolicy; + private AmqpChannelProcessor channelProcessor; + private AutoCloseable mocksCloseable; + + @BeforeEach + void setup() { + mocksCloseable = MockitoAnnotations.openMocks(this); + channelProcessor = new AmqpChannelProcessor<>("namespace-test", TestObject::getStates, retryPolicy, new HashMap<>()); + } + + @AfterEach + void teardown() throws Exception { + // Tear down any inline mocks to avoid memory leaks. + // https://github.com/mockito/mockito/wiki/What's-new-in-Mockito-2#mockito-2250 + Mockito.framework().clearInlineMock(this); + if (mocksCloseable != null) { + mocksCloseable.close(); + } + } + + @Test + @Execution(ExecutionMode.SAME_THREAD) + void doesNotEmitConnectionWhenNotActive() { + // Arrange + final TestPublisher publisher = TestPublisher.createCold(); + + // Act & Assert + final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create(); + try { + StepVerifier.withVirtualTime(() -> publisher.next(connection1).flux() + .subscribeWith(channelProcessor), () -> virtualTimeScheduler, 1) + .expectSubscription() + .thenAwait(Duration.ofMinutes(10)) + .expectNoEvent(Duration.ofMinutes(10)) + .then(() -> connection1.getSink().next(AmqpEndpointState.UNINITIALIZED)) + .expectNoEvent(Duration.ofMinutes(10)) + .thenCancel() + .verify(VERIFY_TIMEOUT); + } finally { + virtualTimeScheduler.dispose(); + } + } + + /** + * Verifies that this AmqpChannelProcessor won't time out even if the 5 minutes default timeout occurs. This is + * possible when there is a disconnect for a long period of time. + */ + @Test + @Execution(ExecutionMode.SAME_THREAD) + void waitsLongPeriodOfTimeForConnection() { + // Arrange + final TestPublisher publisher = TestPublisher.createCold(); + + // Act & Assert + final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create(); + try { + StepVerifier.withVirtualTime(() -> publisher.next(connection1).flux() + .subscribeWith(channelProcessor), () -> virtualTimeScheduler, 1) + .expectSubscription() + .thenAwait(Duration.ofMinutes(10)) + .then(() -> connection1.getSink().next(AmqpEndpointState.ACTIVE)) + .expectNext(connection1) + .expectComplete() + .verify(VERIFY_TIMEOUT); + } finally { + virtualTimeScheduler.dispose(); + } + } + + /** + * Verifies that this AmqpChannelProcessor won't time out even if the 5 minutes default timeout occurs. This is + * possible when there is a disconnect for a long period of time. + */ + @Test + @Execution(ExecutionMode.SAME_THREAD) + void waitsLongPeriodOfTimeForChainedConnections() { + // Arrange + final TestPublisher publisher = TestPublisher.createCold(); + final String contents = "Emitted something after 10 minutes."; + + // Act & Assert + final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create(); + try { + StepVerifier.withVirtualTime(() -> { + return publisher.next(connection1).flux() + .subscribeWith(channelProcessor).flatMap(e -> Mono.just(contents)); + }, () -> virtualTimeScheduler, 1) + .expectSubscription() + .thenAwait(Duration.ofMinutes(10)) + .then(() -> connection1.getSink().next(AmqpEndpointState.ACTIVE)) + .expectNext(contents) + .expectComplete() + .verify(VERIFY_TIMEOUT); + } finally { + virtualTimeScheduler.dispose(); + } + } + + static final class TestObject { + private final TestPublisher processor = TestPublisher.createCold(); + + public Flux getStates() { + return processor.flux(); + } + + public TestPublisher getSink() { + return processor; + } + } +} diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/AmqpChannelProcessorTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/AmqpChannelProcessorTest.java index 90f3e1ef89709..c32cf7be498cc 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/AmqpChannelProcessorTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/AmqpChannelProcessorTest.java @@ -20,7 +20,6 @@ import org.mockito.MockitoAnnotations; import org.reactivestreams.Subscription; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; import reactor.test.StepVerifier; import reactor.test.publisher.TestPublisher; import reactor.test.scheduler.VirtualTimeScheduler; @@ -308,23 +307,6 @@ void errorsWhenResubscribingOnTerminated() { assertTrue(channelProcessor.isChannelClosed()); } - @Test - void doesNotEmitConnectionWhenNotActive() { - // Arrange - final TestPublisher publisher = TestPublisher.createCold(); - - // Act & Assert - StepVerifier.withVirtualTime(() -> publisher.next(connection1).flux() - .subscribeWith(channelProcessor), () -> virtualTimeScheduler, 1) - .expectSubscription() - .thenAwait(Duration.ofMinutes(10)) - .expectNoEvent(Duration.ofMinutes(10)) - .then(() -> connection1.getSink().next(AmqpEndpointState.UNINITIALIZED)) - .expectNoEvent(Duration.ofMinutes(10)) - .thenCancel() - .verify(VERIFY_TIMEOUT); - } - @Test void requiresNonNull() { Assertions.assertThrows(NullPointerException.class, () -> channelProcessor.onNext(null)); @@ -332,49 +314,6 @@ void requiresNonNull() { Assertions.assertThrows(NullPointerException.class, () -> channelProcessor.onError(null)); } - /** - * Verifies that this AmqpChannelProcessor won't time out even if the 5 minutes default timeout occurs. This is - * possible when there is a disconnect for a long period of time. - */ - @Test - void waitsLongPeriodOfTimeForConnection() { - // Arrange - final TestPublisher publisher = TestPublisher.createCold(); - - // Act & Assert - StepVerifier.withVirtualTime(() -> publisher.next(connection1).flux() - .subscribeWith(channelProcessor), () -> virtualTimeScheduler, 1) - .expectSubscription() - .thenAwait(Duration.ofMinutes(10)) - .then(() -> connection1.getSink().next(AmqpEndpointState.ACTIVE)) - .expectNext(connection1) - .expectComplete() - .verify(VERIFY_TIMEOUT); - } - - /** - * Verifies that this AmqpChannelProcessor won't time out even if the 5 minutes default timeout occurs. This is - * possible when there is a disconnect for a long period of time. - */ - @Test - void waitsLongPeriodOfTimeForChainedConnections() { - // Arrange - final TestPublisher publisher = TestPublisher.createCold(); - final String contents = "Emitted something after 10 minutes."; - - // Act & Assert - StepVerifier.withVirtualTime(() -> { - return publisher.next(connection1).flux() - .subscribeWith(channelProcessor).flatMap(e -> Mono.just(contents)); - }, () -> virtualTimeScheduler, 1) - .expectSubscription() - .thenAwait(Duration.ofMinutes(10)) - .then(() -> connection1.getSink().next(AmqpEndpointState.ACTIVE)) - .expectNext(contents) - .expectComplete() - .verify(VERIFY_TIMEOUT); - } - static final class TestObject { private final TestPublisher processor = TestPublisher.createCold(); diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorReceiverIsolatedTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorReceiverIsolatedTest.java new file mode 100644 index 0000000000000..6e6746c8815ad --- /dev/null +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorReceiverIsolatedTest.java @@ -0,0 +1,172 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.implementation; + +import com.azure.core.amqp.AmqpConnection; +import com.azure.core.amqp.AmqpRetryOptions; +import com.azure.core.amqp.AmqpShutdownSignal; +import com.azure.core.amqp.ClaimsBasedSecurityNode; +import com.azure.core.amqp.exception.AmqpErrorCondition; +import com.azure.core.amqp.exception.AmqpResponseCode; +import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.engine.EndpointState; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Record; +import org.apache.qpid.proton.reactor.Reactor; +import org.apache.qpid.proton.reactor.Selectable; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.junit.jupiter.api.parallel.Isolated; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; +import reactor.test.publisher.TestPublisher; +import reactor.test.scheduler.VirtualTimeScheduler; + +import java.io.IOException; +import java.time.Duration; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link ReactorReceiver} using + * {@link reactor.test.scheduler.VirtualTimeScheduler} hence needs + * to run in isolated and sequential. + */ +@Execution(ExecutionMode.SAME_THREAD) +@Isolated +public class ReactorReceiverIsolatedTest { + private static final Duration VERIFY_TIMEOUT = Duration.ofSeconds(10); + + @Mock + private Receiver receiver; + @Mock + private ClaimsBasedSecurityNode cbsNode; + @Mock + private Event event; + @Mock + private Reactor reactor; + @Mock + private Selectable selectable; + @Mock + private Record record; + @Mock + private ReactorDispatcher reactorDispatcher; + @Mock + private AmqpConnection amqpConnection; + @Mock + private TokenManager tokenManager; + + private final TestPublisher shutdownSignals = TestPublisher.create(); + private final AmqpRetryOptions retryOptions = new AmqpRetryOptions(); + private final TestPublisher authorizationResults = TestPublisher.createCold(); + + private ReceiveLinkHandler receiverHandler; + private ReactorReceiver reactorReceiver; + private AutoCloseable mocksCloseable; + + @BeforeEach + void setup() { + mocksCloseable = MockitoAnnotations.openMocks(this); + + when(cbsNode.authorize(any(), any())).thenReturn(Mono.empty()); + + when(event.getLink()).thenReturn(receiver); + when(receiver.getRemoteSource()).thenReturn(new Source()); + + when(reactor.selectable()).thenReturn(selectable); + when(reactor.attachments()).thenReturn(record); + + final String entityPath = "test-entity-path"; + receiverHandler = new ReceiveLinkHandler("test-connection-id", "test-host", + "test-receiver-name", entityPath); + + when(tokenManager.getAuthorizationResults()).thenReturn(authorizationResults.flux()); + + when(amqpConnection.getShutdownSignals()).thenReturn(shutdownSignals.flux()); + + reactorReceiver = new ReactorReceiver(amqpConnection, entityPath, receiver, receiverHandler, tokenManager, + reactorDispatcher, retryOptions); + } + + @AfterEach + void teardown() throws Exception { + // Tear down any inline mocks to avoid memory leaks. + // https://github.com/mockito/mockito/wiki/What's-new-in-Mockito-2#mockito-2250 + Mockito.framework().clearInlineMock(this); + if (mocksCloseable != null) { + mocksCloseable.close(); + } + } + + /** + * Tests the completion of {@link ReactorReceiver#getEndpointStates()} + * when there is no link remote-close frame. + */ + @Test + @Execution(ExecutionMode.SAME_THREAD) + void endpointStateCompletesOnNoRemoteCloseAck() throws IOException { + // Arrange + final String message = "some-message"; + final AmqpErrorCondition errorCondition = AmqpErrorCondition.UNAUTHORIZED_ACCESS; + final ErrorCondition condition = new ErrorCondition(Symbol.getSymbol(errorCondition.getErrorCondition()), + "Test-users"); + + when(receiver.getLocalState()).thenReturn(EndpointState.ACTIVE); + + doAnswer(invocation -> { + // The ReactorDispatcher running localClose() work scheduled by beginClose(...). + final Runnable work = invocation.getArgument(0); + work.run(); + return null; + }).when(reactorDispatcher).invoke(any(Runnable.class)); + + // The localClose() initiated local-close via receiver.close(), here we mock scenario where + // there is no remote-close ack from the broker for the local-close, hence do nothing i.e. no + // call to receiverHandler.onLinkRemoteClose(event) + doNothing().when(receiver).close(); + + // Act & Assert + final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create(); + + try { + StepVerifier.withVirtualTime(() -> reactorReceiver.closeAsync(message, condition), + () -> virtualTimeScheduler, 1) + // Advance virtual time beyond the default timeout of 60 sec, so endpoint state + // completion timeout kicks in. + .thenAwait(Duration.ofSeconds(100)) + .expectComplete() + .verify(VERIFY_TIMEOUT); + + // Assert + StepVerifier.create(reactorReceiver.getEndpointStates()) + // Assert endpoint state completes (via timeout) when there is no broker remote-close ack. + .expectComplete() + .verify(VERIFY_TIMEOUT); + + assertTrue(reactorReceiver.isDisposed()); + + verify(receiver).setCondition(condition); + verify(receiver).close(); + + shutdownSignals.assertNoSubscribers(); + } finally { + virtualTimeScheduler.dispose(); + } + } +} diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorReceiverTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorReceiverTest.java index 8e0c3cc909cd7..63a324d901a4b 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorReceiverTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorReceiverTest.java @@ -55,7 +55,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -566,54 +565,6 @@ void disposeCompletes() throws IOException { shutdownSignals.assertNoSubscribers(); } - /** - * Tests the completion of {@link ReactorReceiver#getEndpointStates()} - * when there is no link remote-close frame. - */ - @Test - void endpointStateCompletesOnNoRemoteCloseAck() throws IOException { - // Arrange - final String message = "some-message"; - final AmqpErrorCondition errorCondition = AmqpErrorCondition.UNAUTHORIZED_ACCESS; - final ErrorCondition condition = new ErrorCondition(Symbol.getSymbol(errorCondition.getErrorCondition()), - "Test-users"); - - when(receiver.getLocalState()).thenReturn(EndpointState.ACTIVE); - - doAnswer(invocation -> { - // The ReactorDispatcher running localClose() work scheduled by beginClose(...). - final Runnable work = invocation.getArgument(0); - work.run(); - return null; - }).when(reactorDispatcher).invoke(any(Runnable.class)); - - // The localClose() initiated local-close via receiver.close(), here we mock scenario where - // there is no remote-close ack from the broker for the local-close, hence do nothing i.e. no - // call to receiverHandler.onLinkRemoteClose(event) - doNothing().when(receiver).close(); - - // Act - StepVerifier.withVirtualTime(() -> reactorReceiver.closeAsync(message, condition)) - // Advance virtual time beyond the default timeout of 60 sec, so endpoint state - // completion timeout kicks in. - .thenAwait(Duration.ofSeconds(100)) - .expectComplete() - .verify(VERIFY_TIMEOUT); - - // Assert - StepVerifier.create(reactorReceiver.getEndpointStates()) - // Assert endpoint state completes (via timeout) when there is no broker remote-close ack. - .expectComplete() - .verify(VERIFY_TIMEOUT); - - assertTrue(reactorReceiver.isDisposed()); - - verify(receiver).setCondition(condition); - verify(receiver).close(); - - shutdownSignals.assertNoSubscribers(); - } - /** * Tests the completion of {@link ReactorReceiver#getEndpointStates()} * when {@link ReactorDispatcher} reject client initiated local-close. diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSenderTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSenderTest.java index b60bbb6474f5d..8f5067c331095 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSenderTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSenderTest.java @@ -699,8 +699,6 @@ public void sendWorkTimeout() throws IOException { }).when(reactorDispatcher).invoke(any(Runnable.class)); doAnswer(invocation -> { - System.out.println("Running send timeout work."); - final Runnable argument = invocation.getArgument(0); argument.run(); return null; diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RetryUtilIsolatedTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RetryUtilIsolatedTest.java new file mode 100644 index 0000000000000..c71e30b3bb058 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RetryUtilIsolatedTest.java @@ -0,0 +1,113 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.implementation; + +import com.azure.core.amqp.AmqpRetryMode; +import com.azure.core.amqp.AmqpRetryOptions; +import com.azure.core.amqp.AmqpTransportType; +import com.azure.core.amqp.exception.AmqpErrorContext; +import com.azure.core.amqp.exception.AmqpException; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.junit.jupiter.api.parallel.Isolated; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; +import reactor.test.publisher.TestPublisher; +import reactor.test.scheduler.VirtualTimeScheduler; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * This class contains tests for {@link RetryUtil} using + * {@link reactor.test.scheduler.VirtualTimeScheduler} hence needs + * to run in isolated and sequential. + */ +@Execution(ExecutionMode.SAME_THREAD) +@Isolated +public class RetryUtilIsolatedTest { + /** + * Tests a retry that times out on a Flux. + */ + @Test + @Execution(ExecutionMode.SAME_THREAD) + void withRetryFluxEmitsItemsLaterThanTimeout() { + // Arrange + final String timeoutMessage = "Operation timed out."; + final Duration timeout = Duration.ofSeconds(5); + final AmqpRetryOptions options = new AmqpRetryOptions() + .setDelay(Duration.ofSeconds(1)) + .setMaxRetries(2) + .setTryTimeout(timeout); + final Duration totalWaitTime = Duration.ofSeconds(options.getMaxRetries() * options.getDelay().getSeconds()); + + final AtomicInteger resubscribe = new AtomicInteger(); + final TestPublisher singleItem = TestPublisher.create(); + + final Flux flux = singleItem.flux() + .doOnSubscribe(s -> resubscribe.incrementAndGet()); + + final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create(); + try { + // Act & Assert + StepVerifier.withVirtualTime(() -> RetryUtil.withRetry(flux, options, timeoutMessage), + () -> virtualTimeScheduler, 1) + .expectSubscription() + .then(() -> singleItem.next(AmqpTransportType.AMQP_WEB_SOCKETS)) + .expectNext(AmqpTransportType.AMQP_WEB_SOCKETS) + .expectNoEvent(totalWaitTime) + .thenCancel() + .verify(); + } finally { + virtualTimeScheduler.dispose(); + } + + assertEquals(1, resubscribe.get()); + } + + static Stream withNonTransientError() { + return Stream.of( + new AmqpException(false, "Test-exception", new AmqpErrorContext("test-ns")), + new IllegalStateException("Some illegal State") + ); + } + + @ParameterizedTest + @MethodSource + @Execution(ExecutionMode.SAME_THREAD) + void withNonTransientError(Throwable nonTransientError) { + // Arrange + final String timeoutMessage = "Operation timed out."; + final Duration timeout = Duration.ofSeconds(30); + final AmqpRetryOptions options = new AmqpRetryOptions() + .setMode(AmqpRetryMode.FIXED) + .setDelay(Duration.ofSeconds(1)) + .setMaxRetries(1) + .setTryTimeout(timeout); + + final Flux stream = Flux.concat( + Flux.defer(() -> Flux.just(0, 1, 2)), + Flux.defer(() -> Flux.error(nonTransientError)), + Flux.defer(() -> Flux.just(3, 4))); + + final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create(true); + + // Act & Assert + try { + StepVerifier.withVirtualTime(() -> RetryUtil.withRetry(stream, options, timeoutMessage), + () -> virtualTimeScheduler, 4) + .expectNext(0, 1, 2) + .expectErrorMatches(error -> error.equals(nonTransientError)) + .verify(); + } finally { + virtualTimeScheduler.dispose(); + } + } +} diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RetryUtilTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RetryUtilTest.java index 8afc01553d99a..848e996d6521d 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RetryUtilTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RetryUtilTest.java @@ -20,7 +20,6 @@ import reactor.core.publisher.Mono; import reactor.test.StepVerifier; import reactor.test.publisher.TestPublisher; -import reactor.test.scheduler.VirtualTimeScheduler; import reactor.util.retry.Retry; import reactor.util.retry.RetryBackoffSpec; @@ -88,44 +87,6 @@ void withRetryFlux() { assertEquals(options.getMaxRetries() + 1, resubscribe.get()); } - /** - * Tests a retry that times out on a Flux. - */ - @Test - void withRetryFluxEmitsItemsLaterThanTimeout() { - // Arrange - final String timeoutMessage = "Operation timed out."; - final Duration timeout = Duration.ofSeconds(5); - final AmqpRetryOptions options = new AmqpRetryOptions() - .setDelay(Duration.ofSeconds(1)) - .setMaxRetries(2) - .setTryTimeout(timeout); - final Duration totalWaitTime = Duration.ofSeconds(options.getMaxRetries() * options.getDelay().getSeconds()); - - final AtomicInteger resubscribe = new AtomicInteger(); - final TestPublisher singleItem = TestPublisher.create(); - - final Flux flux = singleItem.flux() - .doOnSubscribe(s -> resubscribe.incrementAndGet()); - - final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create(); - try { - // Act & Assert - StepVerifier.withVirtualTime(() -> RetryUtil.withRetry(flux, options, timeoutMessage), - () -> virtualTimeScheduler, 1) - .expectSubscription() - .then(() -> singleItem.next(AmqpTransportType.AMQP_WEB_SOCKETS)) - .expectNext(AmqpTransportType.AMQP_WEB_SOCKETS) - .expectNoEvent(totalWaitTime) - .thenCancel() - .verify(); - } finally { - virtualTimeScheduler.dispose(); - } - - assertEquals(1, resubscribe.get()); - } - /** * Tests a retry that times out on a Mono. */ @@ -197,44 +158,6 @@ void withTransientError(Throwable transientError) { .verify(); } - static Stream withNonTransientError() { - return Stream.of( - new AmqpException(false, "Test-exception", new AmqpErrorContext("test-ns")), - new IllegalStateException("Some illegal State") - ); - } - - @ParameterizedTest - @MethodSource - void withNonTransientError(Throwable nonTransientError) { - // Arrange - final String timeoutMessage = "Operation timed out."; - final Duration timeout = Duration.ofSeconds(30); - final AmqpRetryOptions options = new AmqpRetryOptions() - .setMode(AmqpRetryMode.FIXED) - .setDelay(Duration.ofSeconds(1)) - .setMaxRetries(1) - .setTryTimeout(timeout); - - final Flux stream = Flux.concat( - Flux.defer(() -> Flux.just(0, 1, 2)), - Flux.defer(() -> Flux.error(nonTransientError)), - Flux.defer(() -> Flux.just(3, 4))); - - final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create(true); - - // Act & Assert - try { - StepVerifier.withVirtualTime(() -> RetryUtil.withRetry(stream, options, timeoutMessage), - () -> virtualTimeScheduler, 4) - .expectNext(0, 1, 2) - .expectErrorMatches(error -> error.equals(nonTransientError)) - .verify(); - } finally { - virtualTimeScheduler.dispose(); - } - } - static Stream createRetry() { final AmqpRetryOptions fixed = new AmqpRetryOptions() .setMode(AmqpRetryMode.FIXED)