Skip to content

Commit

Permalink
ServiceBus spring upgrade session entity lock renew overflow error v.…
Browse files Browse the repository at this point in the history
…7.1.0 19923 (Azure#19945)

* Fixed session receiver starting Lock renew for each message
  • Loading branch information
hemanttanwar authored Mar 22, 2021
1 parent bd8c7db commit 3cf166b
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 2 deletions.
4 changes: 3 additions & 1 deletion sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# Release History

## 7.2.0-beta.1 (Unreleased)

### Bug Fixes
- Fix issue [19923](https://github.com/Azure/azure-sdk-for-java/issues/19923) for session receiver only: Fix a silent
error 'java.lang.ArithmeticException: long overflow' by not starting 'LockRenewOperation' for each received message.

## 7.1.0 (2021-03-10)
### Bug Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ Flux<ServiceBusMessageContext> receiveMessagesWithContext(int highTide) {
: getOrCreateConsumer().receive().map(ServiceBusMessageContext::new);

final Flux<ServiceBusMessageContext> withAutoLockRenewal;
if (receiverOptions.isAutoLockRenewEnabled()) {
if (!receiverOptions.isSessionReceiver() && receiverOptions.isAutoLockRenewEnabled()) {
withAutoLockRenewal = new FluxAutoLockRenew(messageFlux, receiverOptions.getMaxLockRenewDuration(),
renewalContainer, this::renewMessageLock);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusReceiverClientBuilder;
import com.azure.messaging.servicebus.implementation.DispositionStatus;
import com.azure.messaging.servicebus.implementation.LockContainer;
import com.azure.messaging.servicebus.implementation.MessageWithLockToken;
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
import com.azure.messaging.servicebus.implementation.ServiceBusAmqpConnection;
Expand Down Expand Up @@ -49,6 +50,7 @@
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockedConstruction;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import reactor.core.publisher.DirectProcessor;
Expand Down Expand Up @@ -293,6 +295,43 @@ void receivesNumberOfEvents() {
verify(amqpReceiveLink, never()).updateDisposition(eq(lockToken), any());
}

/**
* Verifies that session receiver does not start 'FluxAutoLockRenew' for each message because a session is already
* locked.
*/
@Test
void receivesMessageLockRenewSessionOnly() {
// Arrange
final int numberOfEvents = 1;
final List<Message> messages = getMessages();
final String lockToken = UUID.randomUUID().toString();
final Duration maxLockRenewDuration = Duration.ofMinutes(1);
ServiceBusReceiverAsyncClient mySessionReceiver = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE,
new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, PREFETCH, maxLockRenewDuration,
false, "Some-Session", null), connectionProcessor,
CLEANUP_INTERVAL, tracerProvider, messageSerializer, onClientClose);

MockedConstruction<FluxAutoLockRenew> mockedAutoLockRenew = Mockito.mockConstructionWithAnswer(FluxAutoLockRenew.class,
invocationOnMock -> new FluxAutoLockRenew(Flux.empty(), Duration.ofSeconds(30),
new LockContainer<>(Duration.ofSeconds(30)), (lock) -> Mono.empty()));

ServiceBusReceivedMessage receivedMessage = mock(ServiceBusReceivedMessage.class);
when(receivedMessage.getLockedUntil()).thenReturn(OffsetDateTime.now());
when(receivedMessage.getLockToken()).thenReturn(lockToken);

when(messageSerializer.deserialize(any(Message.class), eq(ServiceBusReceivedMessage.class)))
.thenReturn(receivedMessage);

// Act & Assert
StepVerifier.create(mySessionReceiver.receiveMessages().take(numberOfEvents))
.then(() -> messages.forEach(messageSink::next))
.expectNextCount(numberOfEvents)
.verifyComplete();

// Message onNext should not trigger `FluxAutoLockRenew` for each message because this is session entity.
Assertions.assertEquals(0, mockedAutoLockRenew.constructed().size());
}

/**
* Verifies that we error if we try to settle a message with null transaction-id.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@
import org.apache.qpid.proton.message.Message;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.MockedConstruction;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import reactor.core.publisher.EmitterProcessor;
Expand Down Expand Up @@ -219,6 +221,63 @@ void singleUnnamedSession() {
.verify(Duration.ofSeconds(45));
}

/**
* Verify that when we receive for a single, unnamed session, the session Lock renew is called once only.
*/
@Test
void singleUnnamedSessionLockRenew() {
// Arrange
ReceiverOptions receiverOptions = new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, false, null,
1);
sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor,
tracerProvider, messageSerializer, receiverOptions);

final String sessionId = "session-1";
final String lockToken = "a-lock-token";
final String linkName = "my-link-name";
final OffsetDateTime sessionLockedUntil = OffsetDateTime.now().plus(Duration.ofSeconds(30));

final Message message = mock(Message.class);
final ServiceBusReceivedMessage receivedMessage = mock(ServiceBusReceivedMessage.class);

when(messageSerializer.deserialize(message, ServiceBusReceivedMessage.class)).thenReturn(receivedMessage);
when(receivedMessage.getSessionId()).thenReturn(sessionId);
when(receivedMessage.getLockToken()).thenReturn(lockToken);

final int numberOfMessages = 2;

when(amqpReceiveLink.getLinkName()).thenReturn(linkName);
when(amqpReceiveLink.getSessionId()).thenReturn(Mono.just(sessionId));
when(amqpReceiveLink.getSessionLockedUntil())
.thenAnswer(invocation -> Mono.just(sessionLockedUntil));
when(amqpReceiveLink.updateDisposition(lockToken, Accepted.getInstance())).thenReturn(Mono.empty());

when(connection.createReceiveLink(anyString(), eq(ENTITY_PATH), any(ServiceBusReceiveMode.class), isNull(),
any(MessagingEntityType.class), isNull())).thenReturn(Mono.just(amqpReceiveLink));

when(managementNode.renewSessionLock(sessionId, linkName)).thenReturn(
Mono.fromCallable(() -> OffsetDateTime.now().plus(Duration.ofSeconds(5))));

MockedConstruction<LockRenewalOperation> mockedLockRenewOperation = Mockito.mockConstructionWithAnswer(LockRenewalOperation.class,
invocationOnMock -> new LockRenewalOperation("lockToken", Duration.ofSeconds(30), true,
(lock) -> Mono.empty(), OffsetDateTime.now()));

// Act & Assert
StepVerifier.create(sessionManager.receive())
.then(() -> {
for (int i = 0; i < numberOfMessages; i++) {
messageSink.next(message);
}
})
.assertNext(context -> assertMessageEquals(sessionId, receivedMessage, context))
.assertNext(context -> assertMessageEquals(sessionId, receivedMessage, context))
.thenCancel()
.verify(Duration.ofSeconds(45));

// message onNext should trigger `LockRenewalOperation` once only for one session.
Assertions.assertEquals(1, mockedLockRenewOperation.constructed().size());
}

/**
* Verify that when we receive multiple sessions, it'll change to the next session when one is complete.
*/
Expand Down

0 comments on commit 3cf166b

Please sign in to comment.