Skip to content

Commit

Permalink
[Amqp] Moving tests using VirtualTimeScheduler to dedicated test clas…
Browse files Browse the repository at this point in the history
…ses to run them in sequential and isolated mode (Azure#29492)

* Creating dedicated test class for tests using VirtualTimeScheduler for running them in sequential/isolated

* Adding license header

* Isolating ReactorReceiver test using VTS

* fixing checkstyle
  • Loading branch information
anuchandy authored and khmic5 committed Jun 19, 2022
1 parent c2eccd6 commit c580ffd
Show file tree
Hide file tree
Showing 7 changed files with 432 additions and 189 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryPolicy;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.junit.jupiter.api.parallel.Isolated;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import reactor.test.publisher.TestPublisher;
import reactor.test.scheduler.VirtualTimeScheduler;

import java.time.Duration;
import java.util.HashMap;

/**
* Tests for {@link AmqpChannelProcessor} using
* {@link reactor.test.scheduler.VirtualTimeScheduler} hence needs
* to run in isolated and sequential.
*/
@Execution(ExecutionMode.SAME_THREAD)
@Isolated
public class AmqpChannelProcessorIsolatedTest {
private static final Duration VERIFY_TIMEOUT = Duration.ofSeconds(30);
private final TestObject connection1 = new TestObject();

@Mock
private AmqpRetryPolicy retryPolicy;
private AmqpChannelProcessor<TestObject> channelProcessor;
private AutoCloseable mocksCloseable;

@BeforeEach
void setup() {
mocksCloseable = MockitoAnnotations.openMocks(this);
channelProcessor = new AmqpChannelProcessor<>("namespace-test", TestObject::getStates, retryPolicy, new HashMap<>());
}

@AfterEach
void teardown() throws Exception {
// Tear down any inline mocks to avoid memory leaks.
// https://github.com/mockito/mockito/wiki/What's-new-in-Mockito-2#mockito-2250
Mockito.framework().clearInlineMock(this);
if (mocksCloseable != null) {
mocksCloseable.close();
}
}

@Test
@Execution(ExecutionMode.SAME_THREAD)
void doesNotEmitConnectionWhenNotActive() {
// Arrange
final TestPublisher<TestObject> publisher = TestPublisher.createCold();

// Act & Assert
final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create();
try {
StepVerifier.withVirtualTime(() -> publisher.next(connection1).flux()
.subscribeWith(channelProcessor), () -> virtualTimeScheduler, 1)
.expectSubscription()
.thenAwait(Duration.ofMinutes(10))
.expectNoEvent(Duration.ofMinutes(10))
.then(() -> connection1.getSink().next(AmqpEndpointState.UNINITIALIZED))
.expectNoEvent(Duration.ofMinutes(10))
.thenCancel()
.verify(VERIFY_TIMEOUT);
} finally {
virtualTimeScheduler.dispose();
}
}

/**
* Verifies that this AmqpChannelProcessor won't time out even if the 5 minutes default timeout occurs. This is
* possible when there is a disconnect for a long period of time.
*/
@Test
@Execution(ExecutionMode.SAME_THREAD)
void waitsLongPeriodOfTimeForConnection() {
// Arrange
final TestPublisher<TestObject> publisher = TestPublisher.createCold();

// Act & Assert
final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create();
try {
StepVerifier.withVirtualTime(() -> publisher.next(connection1).flux()
.subscribeWith(channelProcessor), () -> virtualTimeScheduler, 1)
.expectSubscription()
.thenAwait(Duration.ofMinutes(10))
.then(() -> connection1.getSink().next(AmqpEndpointState.ACTIVE))
.expectNext(connection1)
.expectComplete()
.verify(VERIFY_TIMEOUT);
} finally {
virtualTimeScheduler.dispose();
}
}

/**
* Verifies that this AmqpChannelProcessor won't time out even if the 5 minutes default timeout occurs. This is
* possible when there is a disconnect for a long period of time.
*/
@Test
@Execution(ExecutionMode.SAME_THREAD)
void waitsLongPeriodOfTimeForChainedConnections() {
// Arrange
final TestPublisher<TestObject> publisher = TestPublisher.createCold();
final String contents = "Emitted something after 10 minutes.";

// Act & Assert
final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create();
try {
StepVerifier.withVirtualTime(() -> {
return publisher.next(connection1).flux()
.subscribeWith(channelProcessor).flatMap(e -> Mono.just(contents));
}, () -> virtualTimeScheduler, 1)
.expectSubscription()
.thenAwait(Duration.ofMinutes(10))
.then(() -> connection1.getSink().next(AmqpEndpointState.ACTIVE))
.expectNext(contents)
.expectComplete()
.verify(VERIFY_TIMEOUT);
} finally {
virtualTimeScheduler.dispose();
}
}

static final class TestObject {
private final TestPublisher<AmqpEndpointState> processor = TestPublisher.createCold();

public Flux<AmqpEndpointState> getStates() {
return processor.flux();
}

public TestPublisher<AmqpEndpointState> getSink() {
return processor;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.mockito.MockitoAnnotations;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import reactor.test.publisher.TestPublisher;
import reactor.test.scheduler.VirtualTimeScheduler;
Expand Down Expand Up @@ -308,73 +307,13 @@ void errorsWhenResubscribingOnTerminated() {
assertTrue(channelProcessor.isChannelClosed());
}

@Test
void doesNotEmitConnectionWhenNotActive() {
// Arrange
final TestPublisher<TestObject> publisher = TestPublisher.createCold();

// Act & Assert
StepVerifier.withVirtualTime(() -> publisher.next(connection1).flux()
.subscribeWith(channelProcessor), () -> virtualTimeScheduler, 1)
.expectSubscription()
.thenAwait(Duration.ofMinutes(10))
.expectNoEvent(Duration.ofMinutes(10))
.then(() -> connection1.getSink().next(AmqpEndpointState.UNINITIALIZED))
.expectNoEvent(Duration.ofMinutes(10))
.thenCancel()
.verify(VERIFY_TIMEOUT);
}

@Test
void requiresNonNull() {
Assertions.assertThrows(NullPointerException.class, () -> channelProcessor.onNext(null));

Assertions.assertThrows(NullPointerException.class, () -> channelProcessor.onError(null));
}

/**
* Verifies that this AmqpChannelProcessor won't time out even if the 5 minutes default timeout occurs. This is
* possible when there is a disconnect for a long period of time.
*/
@Test
void waitsLongPeriodOfTimeForConnection() {
// Arrange
final TestPublisher<TestObject> publisher = TestPublisher.createCold();

// Act & Assert
StepVerifier.withVirtualTime(() -> publisher.next(connection1).flux()
.subscribeWith(channelProcessor), () -> virtualTimeScheduler, 1)
.expectSubscription()
.thenAwait(Duration.ofMinutes(10))
.then(() -> connection1.getSink().next(AmqpEndpointState.ACTIVE))
.expectNext(connection1)
.expectComplete()
.verify(VERIFY_TIMEOUT);
}

/**
* Verifies that this AmqpChannelProcessor won't time out even if the 5 minutes default timeout occurs. This is
* possible when there is a disconnect for a long period of time.
*/
@Test
void waitsLongPeriodOfTimeForChainedConnections() {
// Arrange
final TestPublisher<TestObject> publisher = TestPublisher.createCold();
final String contents = "Emitted something after 10 minutes.";

// Act & Assert
StepVerifier.withVirtualTime(() -> {
return publisher.next(connection1).flux()
.subscribeWith(channelProcessor).flatMap(e -> Mono.just(contents));
}, () -> virtualTimeScheduler, 1)
.expectSubscription()
.thenAwait(Duration.ofMinutes(10))
.then(() -> connection1.getSink().next(AmqpEndpointState.ACTIVE))
.expectNext(contents)
.expectComplete()
.verify(VERIFY_TIMEOUT);
}

static final class TestObject {
private final TestPublisher<AmqpEndpointState> processor = TestPublisher.createCold();

Expand Down
Loading

0 comments on commit c580ffd

Please sign in to comment.