diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java index 353c42286638..3895b3fca3cf 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java @@ -6,7 +6,6 @@ import com.azure.messaging.servicebus.implementation.MessageUtils; import com.azure.messaging.servicebus.models.LockRenewalStatus; import reactor.core.Disposable; -import reactor.core.Disposables; import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; @@ -23,13 +22,14 @@ /** * Represents a renewal session or message lock renewal operation that. */ -public class LockRenewalOperation implements AutoCloseable { +class LockRenewalOperation implements AutoCloseable { private final ClientLogger logger = new ClientLogger(LockRenewalOperation.class); private final AtomicBoolean isDisposed = new AtomicBoolean(); private final AtomicReference lockedUntil = new AtomicReference<>(); private final AtomicReference throwable = new AtomicReference<>(); private final AtomicReference status = new AtomicReference<>(LockRenewalStatus.RUNNING); private final MonoProcessor cancellationProcessor = MonoProcessor.create(); + private final Mono completionMono; private final String lockToken; private final boolean isSession; @@ -53,27 +53,55 @@ public class LockRenewalOperation implements AutoCloseable { * Creates a new lock renewal operation. * * @param lockToken Lock or session id to renew. - * @param lockedUntil The initial period the message or session is locked until. + * @param tokenLockedUntil The initial period the message or session is locked until. * @param maxLockRenewalDuration The maximum duration this lock should be renewed. * @param isSession Whether the lock represents a session lock or message lock. * @param renewalOperation The renewal operation to call. */ LockRenewalOperation(String lockToken, Duration maxLockRenewalDuration, boolean isSession, - Function> renewalOperation, OffsetDateTime lockedUntil) { + Function> renewalOperation, OffsetDateTime tokenLockedUntil) { this.lockToken = Objects.requireNonNull(lockToken, "'lockToken' cannot be null."); this.renewalOperation = Objects.requireNonNull(renewalOperation, "'renewalOperation' cannot be null."); this.isSession = isSession; - Objects.requireNonNull(lockedUntil, "'lockedUntil cannot be null.'"); + Objects.requireNonNull(tokenLockedUntil, "'lockedUntil cannot be null.'"); Objects.requireNonNull(maxLockRenewalDuration, "'maxLockRenewalDuration' cannot be null."); if (maxLockRenewalDuration.isNegative()) { - throw logger.logThrowableAsError(new IllegalArgumentException( + throw logger.logExceptionAsError(new IllegalArgumentException( "'maxLockRenewalDuration' cannot be negative.")); } - this.lockedUntil.set(lockedUntil); - this.subscription = getRenewLockOperation(lockedUntil, maxLockRenewalDuration); + this.lockedUntil.set(tokenLockedUntil); + + final Flux renewLockOperation = getRenewLockOperation(tokenLockedUntil, + maxLockRenewalDuration) + .takeUntilOther(cancellationProcessor) + .cache(Duration.ofMinutes(2)); + + this.completionMono = renewLockOperation.then(); + this.subscription = renewLockOperation.subscribe(until -> this.lockedUntil.set(until), + error -> { + logger.error("token[{}]. Error occurred while renewing lock token.", error); + status.set(LockRenewalStatus.FAILED); + throwable.set(error); + cancellationProcessor.onComplete(); + }, () -> { + if (status.compareAndSet(LockRenewalStatus.RUNNING, LockRenewalStatus.COMPLETE)) { + logger.verbose("token[{}]. Renewing session lock task completed.", lockToken); + } + + cancellationProcessor.onComplete(); + }); + } + + /** + * Gets a mono that completes when the operation does. + * + * @return A mono that completes when the renewal operation does. + */ + Mono getCompletionOperation() { + return completionMono; } /** @@ -81,7 +109,7 @@ public class LockRenewalOperation implements AutoCloseable { * * @return the datetime the message or session is locked until. */ - public OffsetDateTime getLockedUntil() { + OffsetDateTime getLockedUntil() { return lockedUntil.get(); } @@ -90,7 +118,7 @@ public OffsetDateTime getLockedUntil() { * * @return The message lock token or {@code null} if a session is being renewed instead. */ - public String getLockToken() { + String getLockToken() { return isSession ? null : lockToken; } @@ -99,7 +127,7 @@ public String getLockToken() { * * @return The session id or {@code null} if it is not a session renewal. */ - public String getSessionId() { + String getSessionId() { return isSession ? lockToken : null; } @@ -108,7 +136,7 @@ public String getSessionId() { * * @return The current status of the renewal operation. */ - public LockRenewalStatus getStatus() { + LockRenewalStatus getStatus() { return status.get(); } @@ -117,7 +145,7 @@ public LockRenewalStatus getStatus() { * * @return the exception if an error occurred whilst renewing the message or session lock, otherwise {@code null}. */ - public Throwable getThrowable() { + Throwable getThrowable() { return throwable.get(); } @@ -146,10 +174,11 @@ public void close() { * @param maxLockRenewalDuration Duration to renew lock for. * @return The subscription for the operation. */ - private Disposable getRenewLockOperation(OffsetDateTime initialLockedUntil, Duration maxLockRenewalDuration) { + private Flux getRenewLockOperation(OffsetDateTime initialLockedUntil, + Duration maxLockRenewalDuration) { if (maxLockRenewalDuration.isZero()) { status.set(LockRenewalStatus.COMPLETE); - return Disposables.single(); + return Flux.empty(); } final OffsetDateTime now = OffsetDateTime.now(); @@ -174,7 +203,6 @@ private Disposable getRenewLockOperation(OffsetDateTime initialLockedUntil, Dura sink.next(initialInterval); final Flux cancellationSignals = Flux.first(cancellationProcessor, Mono.delay(maxLockRenewalDuration)); - return Flux.switchOnNext(emitterProcessor.map(interval -> Mono.delay(interval) .thenReturn(Flux.create(s -> s.next(interval))))) .takeUntilOther(cancellationSignals) @@ -189,19 +217,6 @@ private Disposable getRenewLockOperation(OffsetDateTime initialLockedUntil, Dura sink.next(MessageUtils.adjustServerTimeout(next)); return offsetDateTime; - }) - .subscribe(until -> lockedUntil.set(until), - error -> { - logger.error("token[{}]. Error occurred while renewing lock token.", error); - status.set(LockRenewalStatus.FAILED); - throwable.set(error); - cancellationProcessor.onComplete(); - }, () -> { - if (status.compareAndSet(LockRenewalStatus.RUNNING, LockRenewalStatus.COMPLETE)) { - logger.verbose("token[{}]. Renewing session lock task completed.", lockToken); - } - - cancellationProcessor.onComplete(); - }); + }); } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java index 8ae330e745d4..510b877f43f5 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java @@ -449,7 +449,7 @@ public Mono deadLetter(ServiceBusReceivedMessage message, DeadLetterOption * @throws IllegalArgumentException if {@code lockToken} is an empty string. * @throws IllegalStateException if the receiver is a session receiver or the receiver is disposed. */ - public LockRenewalOperation getAutoRenewMessageLock(String lockToken, Duration maxLockRenewalDuration) { + public Mono getAutoRenewMessageLock(String lockToken, Duration maxLockRenewalDuration) { if (isDisposed.get()) { throw logger.logExceptionAsError(new IllegalStateException( String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "getAutoRenewMessageLock"))); @@ -470,7 +470,8 @@ public LockRenewalOperation getAutoRenewMessageLock(String lockToken, Duration m final LockRenewalOperation operation = new LockRenewalOperation(lockToken, maxLockRenewalDuration, false, this::renewMessageLock); renewalContainer.addOrUpdate(lockToken, Instant.now().plus(maxLockRenewalDuration), operation); - return operation; + + return operation.getCompletionOperation(); } /** @@ -484,7 +485,7 @@ public LockRenewalOperation getAutoRenewMessageLock(String lockToken, Duration m * @throws IllegalArgumentException if {@code lockToken} is an empty string. * @throws IllegalStateException if the receiver is a non-session receiver or the receiver is disposed. */ - public LockRenewalOperation getAutoRenewSessionLock(String sessionId, Duration maxLockRenewalDuration) { + public Mono getAutoRenewSessionLock(String sessionId, Duration maxLockRenewalDuration) { if (isDisposed.get()) { throw logger.logExceptionAsError(new IllegalStateException( String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "getAutoRenewSessionLock"))); @@ -506,7 +507,7 @@ public LockRenewalOperation getAutoRenewSessionLock(String sessionId, Duration m this::renewSessionLock); renewalContainer.addOrUpdate(sessionId, Instant.now().plus(maxLockRenewalDuration), operation); - return operation; + return operation.getCompletionOperation(); } /** diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java index f0dc9c1ad7d9..6b68488da526 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java @@ -18,6 +18,7 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; /** * A synchronous receiver responsible for receiving {@link ServiceBusReceivedMessage} from a specific queue or @@ -254,13 +255,20 @@ public void deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions dead * * @param lockToken Lock token of the message. * @param maxLockRenewalDuration Maximum duration to keep renewing the lock token. - * @return A lock renewal operation for the message. + * @param onError A function to call when an error occurs during lock renewal. * @throws NullPointerException if {@code lockToken} or {@code maxLockRenewalDuration} is null. * @throws IllegalArgumentException if {@code lockToken} is an empty string. * @throws IllegalStateException if the receiver is a session receiver or the receiver is disposed. */ - public LockRenewalOperation getAutoRenewMessageLock(String lockToken, Duration maxLockRenewalDuration) { - return asyncClient.getAutoRenewMessageLock(lockToken, maxLockRenewalDuration); + public void getAutoRenewMessageLock(String lockToken, Duration maxLockRenewalDuration, + Consumer onError) { + final Consumer throwableConsumer = onError != null + ? onError + : error -> logger.warning("Exception occurred while renewing lock token: '{}'.", lockToken, error); + + asyncClient.getAutoRenewMessageLock(lockToken, maxLockRenewalDuration).subscribe( + v -> logger.verbose("Completed renewing lock token: '{}'", lockToken), + throwableConsumer); } /** @@ -268,13 +276,20 @@ public LockRenewalOperation getAutoRenewMessageLock(String lockToken, Duration m * * @param sessionId Id for the session to renew. * @param maxLockRenewalDuration Maximum duration to keep renewing the lock token. - * @return A lock renewal operation for the message. + * @param onError A function to call when an error occurs during lock renewal. * @throws NullPointerException if {@code sessionId} or {@code maxLockRenewalDuration} is null. - * @throws IllegalArgumentException if {@code lockToken} is an empty string. + * @throws IllegalArgumentException if {@code sessionId} is an empty string. * @throws IllegalStateException if the receiver is a non-session receiver or the receiver is disposed. */ - public LockRenewalOperation getAutoRenewSessionLock(String sessionId, Duration maxLockRenewalDuration) { - return asyncClient.getAutoRenewSessionLock(sessionId, maxLockRenewalDuration); + public void getAutoRenewSessionLock(String sessionId, Duration maxLockRenewalDuration, + Consumer onError) { + final Consumer throwableConsumer = onError != null + ? onError + : error -> logger.warning("Exception occurred while renewing session: '{}'.", sessionId, error); + + asyncClient.getAutoRenewSessionLock(sessionId, maxLockRenewalDuration).subscribe( + v -> logger.verbose("Completed renewing session: '{}'", sessionId), + throwableConsumer); } /** diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/LockRenewalOperationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/LockRenewalOperationTest.java index 467b5d23988f..07830ed53d3c 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/LockRenewalOperationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/LockRenewalOperationTest.java @@ -14,12 +14,12 @@ import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; import java.time.Duration; import java.time.OffsetDateTime; import java.util.ArrayDeque; import java.util.Deque; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -87,7 +87,7 @@ void constructor(boolean isSession) { * Verify that when an error occurs, it is displayed. */ @Test - void errors() throws InterruptedException { + void errors() { // Arrange final boolean isSession = true; final Duration renewalPeriod = Duration.ofSeconds(2); @@ -119,7 +119,11 @@ void errors() throws InterruptedException { operation = new LockRenewalOperation(A_LOCK_TOKEN, maxDuration, isSession, renewalOperation, lockedUntil); // Act - TimeUnit.MILLISECONDS.sleep(totalSleepPeriod.toMillis()); + StepVerifier.create(operation.getCompletionOperation()) + .thenAwait(totalSleepPeriod) + .expectErrorMatches(e -> e instanceof IllegalAccessException + && e.getMessage().equals(testError.getMessage())) + .verify(); // Assert assertEquals(LockRenewalStatus.FAILED, operation.getStatus()); @@ -131,7 +135,7 @@ void errors() throws InterruptedException { * Verifies that it stops renewing after the duration has elapsed. */ @Test - void completes() throws InterruptedException { + void completes() { // Arrange final Duration maxDuration = Duration.ofSeconds(8); final Duration renewalPeriod = Duration.ofSeconds(3); @@ -147,10 +151,11 @@ void completes() throws InterruptedException { operation = new LockRenewalOperation(A_LOCK_TOKEN, maxDuration, false, renewalOperation, lockedUntil); // Act - Thread.sleep(totalSleepPeriod.toMillis()); - logger.info("Finished renewals for first sleep."); - Thread.sleep(2000); - System.out.println("Finished second sleep. Should not have any more renewals."); + StepVerifier.create(operation.getCompletionOperation()) + .thenAwait(totalSleepPeriod) + .then(() -> logger.info("Finished renewals for first sleep.")) + .expectComplete() + .verify(Duration.ofMillis(2000)); // Assert assertEquals(LockRenewalStatus.COMPLETE, operation.getStatus()); @@ -165,7 +170,7 @@ void completes() throws InterruptedException { * Verify that we can cancel the operation. */ @Test - void cancellation() throws InterruptedException { + void cancellation() { // Arrange final Duration maxDuration = Duration.ofSeconds(20); final Duration renewalPeriod = Duration.ofSeconds(3); @@ -181,12 +186,14 @@ void cancellation() throws InterruptedException { operation = new LockRenewalOperation(A_LOCK_TOKEN, maxDuration, false, renewalOperation, lockedUntil); // Act - Thread.sleep(totalSleepPeriod.toMillis()); - logger.info("Finished renewals for first sleep. Cancelling"); - operation.close(); - - Thread.sleep(2000); - System.out.println("Finished second sleep. Should not have any more renewals."); + StepVerifier.create(operation.getCompletionOperation()) + .thenAwait(totalSleepPeriod) + .then(() -> { + logger.info("Finished renewals for first sleep. Cancelling"); + operation.close(); + }) + .expectComplete() + .verify(Duration.ofMillis(1000)); // Assert assertEquals(LockRenewalStatus.CANCELLED, operation.getStatus()); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java index dbf904550cda..fddf42cc1e94 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java @@ -7,7 +7,6 @@ import com.azure.messaging.servicebus.administration.models.DeadLetterOptions; import com.azure.messaging.servicebus.implementation.DispositionStatus; import com.azure.messaging.servicebus.implementation.MessagingEntityType; -import com.azure.messaging.servicebus.models.LockRenewalStatus; import com.azure.messaging.servicebus.models.ReceiveMode; import com.azure.messaging.servicebus.models.SubQueue; import org.junit.jupiter.api.Assertions; @@ -930,37 +929,27 @@ void renewMessageLock(MessagingEntityType entityType) throws InterruptedExceptio final String messageId = UUID.randomUUID().toString(); final ServiceBusMessage message = getMessage(messageId, isSessionEnabled); - sendMessage(message).block(TIMEOUT); - - // Assert & Act - StepVerifier.create(receiver.receiveMessages()) - .assertNext(receivedContext -> { - final ServiceBusReceivedMessage receivedMessage = receivedContext.getMessage(); - assertNotNull(receivedMessage); - - final OffsetDateTime lockedUntil = receivedMessage.getLockedUntil(); - assertNotNull(lockedUntil); + final ServiceBusReceivedMessageContext receivedContext = sendMessage(message) + .then(receiver.receiveMessages().next()) + .block(TIMEOUT); + assertNotNull(receivedContext); - final LockRenewalOperation operation = receiver.getAutoRenewMessageLock( - receivedMessage.getLockToken(), maximumDuration); + final ServiceBusReceivedMessage receivedMessage = receivedContext.getMessage(); + assertNotNull(receivedMessage); - assertEquals(LockRenewalStatus.RUNNING, operation.getStatus()); - try { - Thread.sleep(sleepDuration.toMillis()); + final OffsetDateTime lockedUntil = receivedMessage.getLockedUntil(); + assertNotNull(lockedUntil); - assertTrue(lockedUntil.isBefore(operation.getLockedUntil())); - assertEquals(LockRenewalStatus.COMPLETE, operation.getStatus()); - } catch (InterruptedException e) { - logger.error("Could not sleep.", e); + // Assert & Act + StepVerifier.create(receiver.getAutoRenewMessageLock(receivedMessage.getLockToken(), maximumDuration)) + .thenAwait(sleepDuration) + .then(() -> { + logger.info("Completing message."); + int numberCompleted = completeMessages(receiver, Collections.singletonList(receivedMessage)); - operation.close(); - assertEquals(LockRenewalStatus.CANCELLED, operation.getStatus()); - } finally { - int numberCompleted = completeMessages(receiver, - Collections.singletonList(receivedMessage)); - messagesPending.addAndGet(-numberCompleted); - } - }).thenCancel() + messagesPending.addAndGet(-numberCompleted); + }) + .expectComplete() .verify(Duration.ofMinutes(3)); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java index 788f6507ab03..a0e6f6d2ef81 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java @@ -22,7 +22,6 @@ import com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor; import com.azure.messaging.servicebus.implementation.ServiceBusManagementNode; import com.azure.messaging.servicebus.implementation.ServiceBusReactorReceiver; -import com.azure.messaging.servicebus.models.LockRenewalStatus; import com.azure.messaging.servicebus.models.ReceiveMode; import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.transport.DeliveryState.DeliveryStateType; @@ -58,15 +57,11 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; import static com.azure.messaging.servicebus.TestUtils.getMessage; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; @@ -103,7 +98,6 @@ class ServiceBusReceiverAsyncClientTest { private ServiceBusConnectionProcessor connectionProcessor; private ServiceBusReceiverAsyncClient receiver; private ServiceBusReceiverAsyncClient sessionReceiver; - private Duration maxAutoLockRenewalDuration; @Mock private ServiceBusReactorReceiver amqpReceiveLink; @@ -125,8 +119,6 @@ class ServiceBusReceiverAsyncClientTest { private ServiceBusReceivedMessage receivedMessage2; @Mock private Runnable onClientClose; - @Mock - private Function> renewalOperation; @BeforeAll static void beforeAll() { @@ -246,7 +238,7 @@ void peekWithSequenceOneMessage() { void receivesNumberOfEvents() { // Arrange final int numberOfEvents = 1; - final List messages = getMessages(10); + final List messages = getMessages(); ServiceBusReceivedMessage receivedMessage = mock(ServiceBusReceivedMessage.class); when(receivedMessage.getLockToken()).thenReturn(UUID.randomUUID().toString()); @@ -737,12 +729,11 @@ void cannotRenewMessageLockInSession() { * Verifies that we can auto-renew a message lock. */ @Test - void autoRenewMessageLock() throws InterruptedException { + void autoRenewMessageLock() { // Arrange final Duration maxDuration = Duration.ofSeconds(8); final Duration renewalPeriod = Duration.ofSeconds(3); final String lockToken = "some-token"; - final OffsetDateTime startTime = OffsetDateTime.now(); // At most 4 times because we renew the lock before it expires (by some seconds). final int atMost = 5; @@ -752,30 +743,24 @@ void autoRenewMessageLock() throws InterruptedException { .thenReturn(Mono.fromCallable(() -> Instant.now().plus(renewalPeriod))); // Act & Assert - final LockRenewalOperation operation = receiver.getAutoRenewMessageLock(lockToken, maxDuration); - Thread.sleep(totalSleepPeriod.toMillis()); - logger.info("Finished renewals for first sleep."); - - // Assert - assertEquals(LockRenewalStatus.COMPLETE, operation.getStatus()); - assertNull(operation.getThrowable()); - assertTrue(startTime.isBefore(operation.getLockedUntil()), String.format( - "initial lockedUntil[%s] is not before lockedUntil[%s]", startTime, operation.getLockedUntil())); + StepVerifier.create(receiver.getAutoRenewMessageLock(lockToken, maxDuration)) + .thenAwait(totalSleepPeriod) + .then(() -> logger.info("Finished renewals for first sleep.")) + .expectComplete() + .verify(Duration.ofSeconds(5)); verify(managementNode, Mockito.atMost(atMost)).renewMessageLock(lockToken, null); } - /** - * Verifies that we can auto-renew a message lock. + * Verifies that we can auto-renew a session lock. */ @Test - void autoRenewSessionLock() throws InterruptedException { + void autoRenewSessionLock() { // Arrange final Duration maxDuration = Duration.ofSeconds(8); final Duration renewalPeriod = Duration.ofSeconds(3); final String sessionId = "some-token"; - final OffsetDateTime startTime = OffsetDateTime.now(); // At most 4 times because we renew the lock before it expires (by some seconds). final int atMost = 5; @@ -785,23 +770,19 @@ void autoRenewSessionLock() throws InterruptedException { .thenReturn(Mono.fromCallable(() -> Instant.now().plus(renewalPeriod))); // Act & Assert - final LockRenewalOperation operation = sessionReceiver.getAutoRenewSessionLock(sessionId, maxDuration); - Thread.sleep(totalSleepPeriod.toMillis()); - logger.info("Finished renewals for first sleep."); - - // Assert - assertEquals(LockRenewalStatus.COMPLETE, operation.getStatus()); - assertNull(operation.getThrowable()); - assertTrue(startTime.isBefore(operation.getLockedUntil()), String.format( - "initial lockedUntil[%s] is not before lockedUntil[%s]", startTime, operation.getLockedUntil())); + StepVerifier.create(sessionReceiver.getAutoRenewSessionLock(sessionId, maxDuration)) + .thenAwait(totalSleepPeriod) + .then(() -> logger.info("Finished renewals for first sleep.")) + .expectComplete() + .verify(Duration.ofSeconds(5)); verify(managementNode, Mockito.atMost(atMost)).renewSessionLock(sessionId, null); } - private List getMessages(int numberOfEvents) { + private List getMessages() { final Map map = Collections.singletonMap("SAMPLE_HEADER", "foo"); - return IntStream.range(0, numberOfEvents) + return IntStream.range(0, 10) .mapToObj(index -> getMessage(PAYLOAD_BYTES, messageTrackingUUID, map)) .collect(Collectors.toList()); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java index de17dcb097c1..c200b1f0fcf5 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java @@ -15,6 +15,7 @@ import org.mockito.MockitoAnnotations; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.test.publisher.TestPublisher; import java.time.Duration; import java.time.Instant; @@ -25,17 +26,21 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -59,9 +64,10 @@ class ServiceBusReceiverClientTest { private Map propertiesToModify; @Mock private ServiceBusTransactionContext transactionContext; - @Mock private ServiceBusReceivedMessage message; + @Mock + private Consumer onErrorConsumer; @BeforeEach void setup() { @@ -126,6 +132,138 @@ void abandonMessageWithProperties() { verify(asyncClient).abandon(eq(message), eq(propertiesToModify)); } + /** + * Verifies that we can auto-renew a message lock. + */ + @Test + void autoRenewMessageLock() { + // Arrange + final Duration maxDuration = Duration.ofSeconds(8); + final TestPublisher publisher = TestPublisher.create(); + + doAnswer(answer -> { + fail("On error should not have been invoked."); + return null; + }).when(onErrorConsumer).accept(any()); + when(asyncClient.getAutoRenewMessageLock(LOCK_TOKEN, maxDuration)).thenReturn(publisher.mono()); + + // Act + client.getAutoRenewMessageLock(LOCK_TOKEN, maxDuration, onErrorConsumer); + + // Assert + verify(asyncClient).getAutoRenewMessageLock(LOCK_TOKEN, maxDuration); + } + + /** + * Verifies that we can auto-renew a message lock and it calls the error consumer. + */ + @Test + void autoRenewMessageLockFails() { + // Arrange + final Duration maxDuration = Duration.ofSeconds(8); + final TestPublisher publisher = TestPublisher.create(); + final Throwable testError = new IllegalAccessException("Some exception"); + + when(asyncClient.getAutoRenewMessageLock(LOCK_TOKEN, maxDuration)).thenReturn(publisher.mono()); + + client.getAutoRenewMessageLock(LOCK_TOKEN, maxDuration, onErrorConsumer); + + // Act + publisher.error(testError); + + // Assert + verify(asyncClient).getAutoRenewMessageLock(LOCK_TOKEN, maxDuration); + verify(onErrorConsumer).accept(testError); + } + + /** + * Verifies that we can auto-renew a message lock and it will not fail with an NPE when we have a null onError. + */ + @Test + void autoRenewMessageLockFailsNull() { + // Arrange + final Duration maxDuration = Duration.ofSeconds(8); + final TestPublisher publisher = TestPublisher.create(); + final Throwable testError = new IllegalAccessException("Some exception"); + + when(asyncClient.getAutoRenewMessageLock(LOCK_TOKEN, maxDuration)).thenReturn(publisher.mono()); + + client.getAutoRenewMessageLock(LOCK_TOKEN, maxDuration, null); + + // Act + publisher.error(testError); + + // Assert + verify(asyncClient).getAutoRenewMessageLock(LOCK_TOKEN, maxDuration); + verify(onErrorConsumer, never()).accept(testError); + } + + /** + * Verifies that we can auto-renew a session lock. + */ + @Test + void autoRenewSessionLock() { + // Arrange + final Duration maxDuration = Duration.ofSeconds(8); + final TestPublisher publisher = TestPublisher.create(); + + doAnswer(answer -> { + fail("On error should not have been invoked."); + return null; + }).when(onErrorConsumer).accept(any()); + when(asyncClient.getAutoRenewSessionLock(LOCK_TOKEN, maxDuration)).thenReturn(publisher.mono()); + + // Act + client.getAutoRenewSessionLock(LOCK_TOKEN, maxDuration, onErrorConsumer); + + // Assert + verify(asyncClient).getAutoRenewSessionLock(LOCK_TOKEN, maxDuration); + } + + /** + * Verifies that we can auto-renew a session lock and it calls the error consumer. + */ + @Test + void autoRenewSessionLockFails() { + // Arrange + final Duration maxDuration = Duration.ofSeconds(8); + final TestPublisher publisher = TestPublisher.create(); + final Throwable testError = new IllegalAccessException("Some exception"); + + when(asyncClient.getAutoRenewSessionLock(LOCK_TOKEN, maxDuration)).thenReturn(publisher.mono()); + + client.getAutoRenewSessionLock(LOCK_TOKEN, maxDuration, onErrorConsumer); + + // Act + publisher.error(testError); + + // Assert + verify(asyncClient).getAutoRenewSessionLock(LOCK_TOKEN, maxDuration); + verify(onErrorConsumer).accept(testError); + } + + /** + * Verifies that we can auto-renew a message lock and it will not fail with an NPE when we have a null onError. + */ + @Test + void autoRenewSessionLockFailsNull() { + // Arrange + final Duration maxDuration = Duration.ofSeconds(8); + final TestPublisher publisher = TestPublisher.create(); + final Throwable testError = new IllegalAccessException("Some exception"); + + when(asyncClient.getAutoRenewSessionLock(LOCK_TOKEN, maxDuration)).thenReturn(publisher.mono()); + + client.getAutoRenewSessionLock(LOCK_TOKEN, maxDuration, null); + + // Act + publisher.error(testError); + + // Assert + verify(asyncClient).getAutoRenewSessionLock(LOCK_TOKEN, maxDuration); + verify(onErrorConsumer, never()).accept(testError); + } + @Test void completeMessageWithTransaction() { // Arrange @@ -318,6 +456,7 @@ void peekBatchMessagesMax() { } }); }); + when(asyncClient.peekMessages(maxMessages)).thenReturn(messages); // Act @@ -382,15 +521,13 @@ void peekBatchMessagesMaxSequenceNumber() { // Arrange final int maxMessages = 10; final long sequenceNumber = 100; - final Flux messages = Flux.create(sink -> { - sink.onRequest(number -> { - for (int i = 0; i < maxMessages; i++) { - sink.next(mock(ServiceBusReceivedMessage.class)); - } + final Flux messages = Flux.create(sink -> sink.onRequest(number -> { + for (int i = 0; i < maxMessages; i++) { + sink.next(mock(ServiceBusReceivedMessage.class)); + } - sink.complete(); - }); - }); + sink.complete(); + })); when(asyncClient.peekMessagesAt(maxMessages, sequenceNumber)).thenReturn(messages); // Act