From ab9eb78600807fad73f84dee4953607b798084c4 Mon Sep 17 00:00:00 2001 From: Hemant Tanwar Date: Fri, 22 May 2020 08:56:50 -0700 Subject: [PATCH] Servicebus track2 sync queue up multiple receive calls (#10940) SyncReceiver: Queue up the receive request and process them one at a time --- .../servicebus/ServiceBusReceiverClient.java | 52 ++-- .../SynchronousMessageSubscriber.java | 242 ++++++++++++++---- .../servicebus/SynchronousReceiveWork.java | 47 +++- ...rviceBusReceiverClientIntegrationTest.java | 128 ++++++++- .../SynchronousMessageSubscriberTest.java | 131 ++++++++++ 5 files changed, 514 insertions(+), 86 deletions(-) create mode 100644 sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/SynchronousMessageSubscriberTest.java diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java index 3beab123a9b5b..4fda284fb1aa0 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java @@ -8,7 +8,7 @@ import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.models.DeadLetterOptions; import com.azure.messaging.servicebus.models.ReceiveMode; -import reactor.core.publisher.EmitterProcessor; +import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusReceiverClientBuilder; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; @@ -35,10 +35,9 @@ public final class ServiceBusReceiverClient implements AutoCloseable { private final AtomicInteger idGenerator = new AtomicInteger(); private final ServiceBusReceiverAsyncClient asyncClient; private final Duration operationTimeout; - private final Object lock = new Object(); - private final AtomicReference> messageProcessor = - new AtomicReference<>(); + /* To hold each receive work item to be processed.*/ + private final AtomicReference synchronousMessageSubscriber = new AtomicReference<>(); /** * Creates a synchronous receiver given its asynchronous counterpart. @@ -467,7 +466,9 @@ public IterableStream receive(int maxMessages) } /** - * Receives an iterable stream of {@link ServiceBusReceivedMessage messages} from the Service Bus entity. + * Receives an iterable stream of {@link ServiceBusReceivedMessage messages} from the Service Bus entity. The + * default receive mode is {@link ReceiveMode#PEEK_LOCK } unless it is changed during creation of + * {@link ServiceBusReceiverClient} using {@link ServiceBusReceiverClientBuilder#receiveMode(ReceiveMode)}. * * @param maxMessages The maximum number of messages to receive. * @param maxWaitTime The time the client waits for receiving a message before it times out. @@ -475,7 +476,8 @@ public IterableStream receive(int maxMessages) * * @throws IllegalArgumentException if {@code maxMessages} or {@code maxWaitTime} is zero or a negative value. */ - public IterableStream receive(int maxMessages, Duration maxWaitTime) { + public IterableStream receive(int maxMessages, + Duration maxWaitTime) { if (maxMessages <= 0) { throw logger.logExceptionAsError(new IllegalArgumentException( "'maxMessages' cannot be less than or equal to 0. maxMessages: " + maxMessages)); @@ -609,9 +611,9 @@ public void setSessionState(String sessionId, byte[] sessionState) { public void close() { asyncClient.close(); - EmitterProcessor processor = messageProcessor.getAndSet(null); - if (processor != null) { - processor.onComplete(); + SynchronousMessageSubscriber messageSubscriber = synchronousMessageSubscriber.getAndSet(null); + if (messageSubscriber != null && !messageSubscriber.isDisposed()) { + messageSubscriber.dispose(); } } @@ -621,22 +623,24 @@ public void close() { */ private void queueWork(int maximumMessageCount, Duration maxWaitTime, FluxSink emitter) { - synchronized (lock) { - final long id = idGenerator.getAndIncrement(); - EmitterProcessor emitterProcessor = messageProcessor.get(); - - final SynchronousReceiveWork work = new SynchronousReceiveWork(id, maximumMessageCount, maxWaitTime, - emitter); - final SynchronousMessageSubscriber syncSubscriber = new SynchronousMessageSubscriber(work); - logger.info("[{}]: Started synchronous message subscriber.", id); - - if (emitterProcessor == null) { - emitterProcessor = this.asyncClient.receive() - .subscribeWith(EmitterProcessor.create(asyncClient.getReceiverOptions().getPrefetchCount(), false)); - messageProcessor.set(emitterProcessor); + final long id = idGenerator.getAndIncrement(); + final SynchronousReceiveWork work = new SynchronousReceiveWork(id, maximumMessageCount, maxWaitTime, emitter); + + SynchronousMessageSubscriber messageSubscriber = synchronousMessageSubscriber.get(); + if (messageSubscriber == null) { + long prefetch = asyncClient.getReceiverOptions().getPrefetchCount(); + SynchronousMessageSubscriber newSubscriber = new SynchronousMessageSubscriber(prefetch, work); + + if (!synchronousMessageSubscriber.compareAndSet(null, newSubscriber)) { + newSubscriber.dispose(); + SynchronousMessageSubscriber existing = synchronousMessageSubscriber.get(); + existing.queueWork(work); + } else { + asyncClient.receive().subscribeWith(newSubscriber); } - - emitterProcessor.subscribe(syncSubscriber); + } else { + messageSubscriber.queueWork(work); } + logger.verbose("[{}] Receive request queued up.", work.getId()); } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SynchronousMessageSubscriber.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SynchronousMessageSubscriber.java index 8600c87d0f9c1..e296fb35b60ad 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SynchronousMessageSubscriber.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SynchronousMessageSubscriber.java @@ -5,67 +5,204 @@ import com.azure.core.util.logging.ClientLogger; import org.reactivestreams.Subscription; +import reactor.core.Disposable; import reactor.core.publisher.BaseSubscriber; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Operators; -import java.util.Objects; -import java.util.Timer; -import java.util.TimerTask; +import java.time.Duration; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; /** * Subscriber that listens to events and publishes them downstream and publishes events to them in the order received. */ class SynchronousMessageSubscriber extends BaseSubscriber { private final ClientLogger logger = new ClientLogger(SynchronousMessageSubscriber.class); - private final Timer timer = new Timer(); private final AtomicBoolean isDisposed = new AtomicBoolean(); - private final SynchronousReceiveWork work; + private final AtomicInteger wip = new AtomicInteger(); + private final Queue workQueue = new ConcurrentLinkedQueue<>(); + private final Queue bufferMessages = new ConcurrentLinkedQueue<>(); + private final AtomicLong remaining = new AtomicLong(); + + private final long requested; + private final Object currentWorkLock = new Object(); + + private Disposable currentTimeoutOperation; + private SynchronousReceiveWork currentWork; + private boolean subscriberInitialized; private volatile Subscription subscription; - SynchronousMessageSubscriber(SynchronousReceiveWork work) { - this.work = Objects.requireNonNull(work, "'work' cannot be null."); + private static final AtomicReferenceFieldUpdater UPSTREAM = + AtomicReferenceFieldUpdater.newUpdater(SynchronousMessageSubscriber.class, Subscription.class, + "subscription"); + + + SynchronousMessageSubscriber(long prefetch, SynchronousReceiveWork initialWork) { + this.workQueue.add(initialWork); + requested = initialWork.getNumberOfEvents() > prefetch ? initialWork.getNumberOfEvents() : prefetch; } /** * On an initial subscription, will take the first work item, and request that amount of work for it. - * * @param subscription Subscription for upstream. */ @Override protected void hookOnSubscribe(Subscription subscription) { - if (this.subscription == null) { + + if (Operators.setOnce(UPSTREAM, this, subscription)) { this.subscription = subscription; + remaining.addAndGet(requested); + subscription.request(requested); + subscriberInitialized = true; + drain(); + } else { + logger.error("Already subscribed once."); } + } + + /** + * Publishes the event to the current {@link SynchronousReceiveWork}. If that work item is complete, will dispose of + * the subscriber. + * @param message Event to publish. + */ + @Override + protected void hookOnNext(ServiceBusReceivedMessageContext message) { + bufferMessages.add(message); + drain(); + } + + /** + * Queue the work to be picked up by drain loop. + * @param work to be queued. + */ + void queueWork(SynchronousReceiveWork work) { logger.info("[{}] Pending: {}, Scheduling receive timeout task '{}'.", work.getId(), work.getNumberOfEvents(), work.getTimeout()); + workQueue.add(work); - subscription.request(work.getNumberOfEvents()); - - timer.schedule(new ReceiveTimeoutTask(work.getId(), this::dispose), work.getTimeout().toMillis()); + // Do not drain if another thread want to queue the work before we have subscriber + if (subscriberInitialized) { + drain(); + } } /** - * Publishes the event to the current {@link SynchronousReceiveWork}. If that work item is complete, will dispose of - * the subscriber. - * - * @param value Event to publish. + * Drain the work, only one thread can be in this loop at a time. */ - @Override - protected void hookOnNext(ServiceBusReceivedMessageContext value) { - work.next(value); + private void drain() { + // If someone is already in this loop, then we are already clearing the queue. + if (!wip.compareAndSet(0, 1)) { + return; + } - if (work.isTerminal()) { - logger.info("[{}] Completed. Closing Flux and cancelling subscription.", work.getId()); - dispose(); + try { + drainQueue(); + } finally { + final int decremented = wip.decrementAndGet(); + if (decremented != 0) { + logger.warning("There should be 0, but was: {}", decremented); + } } } - @Override - protected void hookOnComplete() { - logger.info("[{}] Completed. No events to listen to.", work.getId()); - dispose(); + /*** + * Drain the queue using a lock on current work in progress. + */ + private void drainQueue() { + if (isTerminated()) { + return; + } + + // Acquiring the lock + synchronized (currentWorkLock) { + + // Making sure current work not become terminal since last drain queue cycle + if (currentWork != null && currentWork.isTerminal()) { + workQueue.remove(currentWork); + if (currentTimeoutOperation != null && !currentTimeoutOperation.isDisposed()) { + currentTimeoutOperation.dispose(); + } + currentTimeoutOperation = null; + } + + // We should process a work when + // 1. it is first time getting picked up + // 2. or more messages have arrived while we were in drain loop. + // We might not have all the message in bufferMessages needed for workQueue, Thus we will only remove work + // from queue when we have delivered all the messages to currentWork. + + while ((currentWork = workQueue.peek()) != null + && (!currentWork.isProcessingStarted() || bufferMessages.size() > 0)) { + + // Additional check for safety, but normally this work should never be terminal + if (currentWork.isTerminal()) { + // This work already finished by either timeout or no more messages to send, process next work. + workQueue.remove(currentWork); + if (currentTimeoutOperation != null && !currentTimeoutOperation.isDisposed()) { + currentTimeoutOperation.dispose(); + } + continue; + } + + if (!currentWork.isProcessingStarted()) { + // timer to complete the currentWork in case of timeout trigger + currentTimeoutOperation = getTimeoutOperation(currentWork); + currentWork.startedProcessing(); + } + + // Send messages to currentWork from buffer + while (bufferMessages.size() > 0 && !currentWork.isTerminal()) { + currentWork.next(bufferMessages.poll()); + remaining.decrementAndGet(); + } + + // if we have delivered all the messages to currentWork, we will complete it. + if (currentWork.isTerminal()) { + if (currentWork.getError() == null) { + currentWork.complete(); + } + // Now remove from queue since it is complete + workQueue.remove(currentWork); + if (currentTimeoutOperation != null && !currentTimeoutOperation.isDisposed()) { + currentTimeoutOperation.dispose(); + } + logger.verbose("The work [{}] is complete.", currentWork.getId()); + } else { + // Since this work is not complete, find out how much we should request from upstream + long creditToAdd = currentWork.getRemaining() - (remaining.get() + bufferMessages.size()); + if (creditToAdd > 0) { + remaining.addAndGet(creditToAdd); + subscription.request(creditToAdd); + logger.verbose("Requesting [{}] from upstream for work [{}].", creditToAdd, + currentWork.getId()); + } + } + } + } + } + + /** + * @param work on which timeout thread need to start. + * + * @return {@link Disposable} for the timeout operation. + */ + private Disposable getTimeoutOperation(SynchronousReceiveWork work) { + Duration timeout = work.getTimeout(); + return Mono.delay(timeout).thenReturn(work) + .subscribe(l -> { + synchronized (currentWorkLock) { + if (currentWork == work) { + work.timeout(); + } + } + }); } /** @@ -73,46 +210,45 @@ protected void hookOnComplete() { */ @Override protected void hookOnError(Throwable throwable) { - logger.error("[{}] Errors occurred upstream", work.getId(), throwable); - work.error(throwable); + logger.error("[{}] Errors occurred upstream", currentWork.getId(), throwable); + synchronized (currentWorkLock) { + currentWork.error(throwable); + } dispose(); } @Override protected void hookOnCancel() { - dispose(); - } - - /** - * {@inheritDoc} - */ - @Override - public void dispose() { if (isDisposed.getAndSet(true)) { return; } - work.complete(); + synchronized (currentWorkLock) { + if (currentWork != null) { + currentWork.complete(); + } + if (currentTimeoutOperation != null && !currentTimeoutOperation.isDisposed()) { + currentTimeoutOperation.dispose(); + } + currentTimeoutOperation = null; + } + subscription.cancel(); - timer.cancel(); - super.dispose(); } - private static final class ReceiveTimeoutTask extends TimerTask { - private final ClientLogger logger = new ClientLogger(ReceiveTimeoutTask.class); - private final long workId; - private final Runnable onDispose; + private boolean isTerminated() { + return isDisposed.get(); + } - ReceiveTimeoutTask(long workId, Runnable onDispose) { - this.workId = workId; - this.onDispose = onDispose; - } + int getWorkQueueSize() { + return this.workQueue.size(); + } - @Override - public void run() { - logger.info("[{}] Timeout encountered, disposing of subscriber.", workId); - onDispose.run(); - } + long getRequested() { + return this.requested; } -} + boolean isSubscriberInitialized() { + return this.subscriberInitialized; + } +} diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SynchronousReceiveWork.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SynchronousReceiveWork.java index ca8939648215d..a0b557cb76834 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SynchronousReceiveWork.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SynchronousReceiveWork.java @@ -20,6 +20,12 @@ class SynchronousReceiveWork { private final Duration timeout; private final FluxSink emitter; + // Indicate state that timeout has occurred for this work. + private boolean workTimedOut = false; + + // Indicate that if processing started or not. + private boolean processingStarted; + private volatile Throwable error = null; /** @@ -66,6 +72,13 @@ int getNumberOfEvents() { return numberToReceive; } + /** + * @return remaining events to receive. + */ + int getRemaining() { + return remaining.get(); + } + /** * Gets whether or not the work item has reached a terminal state. * @@ -73,7 +86,7 @@ int getNumberOfEvents() { * false} otherwise. */ boolean isTerminal() { - return emitter.isCancelled() || remaining.get() == 0 || error != null; + return emitter.isCancelled() || remaining.get() == 0 || error != null || workTimedOut; } /** @@ -99,6 +112,15 @@ void complete() { emitter.complete(); } + /** + * Completes the publisher and sets the state to timeout. + */ + void timeout() { + logger.info("[{}]: Work timeout occurred. Completing the work.", id); + emitter.complete(); + workTimedOut = true; + } + /** * Publishes an error downstream. This is a terminal step. * @@ -108,4 +130,27 @@ void error(Throwable error) { this.error = error; emitter.error(error); } + + /** + * Returns the error object. + * @return the error. + */ + Throwable getError() { + return this.error; + } + + /** + * Indiate that processing is started for this work. + */ + void startedProcessing() { + this.processingStarted = true; + } + + /** + * + * @return flag indicting that processing is started or not. + */ + boolean isProcessingStarted() { + return this.processingStarted; + } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientIntegrationTest.java index b8e834a24ec22..683be0f421b9d 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientIntegrationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientIntegrationTest.java @@ -12,6 +12,7 @@ import org.junit.jupiter.api.Tag; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; +import reactor.core.publisher.Mono; import java.time.Duration; import java.time.Instant; @@ -29,6 +30,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; /** * Integration tests for {@link ServiceBusReceiverClient} from queues or subscriptions. @@ -99,26 +101,35 @@ protected void afterTest() { } /** - * Verifies that we can only call receive() multiple times. + * Verifies that we can only call receive() multiple times and with one of the receive does timeout. */ @MethodSource("messagingEntityWithSessions") @ParameterizedTest - void receiveByTwoSubscriber(MessagingEntityType entityType, boolean isSessionEnabled) { + void multipleReceiveByOneSubscriberMessageTimeout(MessagingEntityType entityType, boolean isSessionEnabled) { // Arrange setSenderAndReceiver(entityType, isSessionEnabled); - final int maxMessages = 1; - final int totalReceive = 3; + final int maxMessages = 2; + final int totalReceive = 2; final Duration shortTimeOut = Duration.ofSeconds(8); + final Duration longTimeOut = Duration.ofSeconds(10); final String messageId = UUID.randomUUID().toString(); - final ServiceBusMessage message = getMessage(messageId, isSessionEnabled); - + List messageList = new ArrayList<>(); for (int i = 0; i < totalReceive * maxMessages; ++i) { - sendMessage(message); + messageList.add(getMessage(messageId, isSessionEnabled)); } + Mono.just(true) + .delayElement(longTimeOut) + .map(aBoolean -> { + sendMessages(messageList); + return aBoolean; + }) + .subscribe(); // Act & Assert - IterableStream messages; + IterableStream messages = receiver.receive(maxMessages, shortTimeOut); + long received = messages.stream().count(); + assertEquals(0, received); int receivedMessageCount; int totalReceivedCount = 0; @@ -138,6 +149,102 @@ void receiveByTwoSubscriber(MessagingEntityType entityType, boolean isSessionEna assertEquals(totalReceive * maxMessages, totalReceivedCount); } + /** + * Verifies that we can only call receive() multiple times. + */ + @MethodSource("messagingEntityWithSessions") + @ParameterizedTest + void multipleReceiveByOneSubscriber(MessagingEntityType entityType, boolean isSessionEnabled) { + // Arrange + setSenderAndReceiver(entityType, isSessionEnabled); + final int maxMessagesEachReceive = 3; + final int totalReceiver = 7; + final Duration shortTimeOut = Duration.ofSeconds(8); + + final String messageId = UUID.randomUUID().toString(); + final List messageList = new ArrayList<>(); + for (int i = 0; i < totalReceiver * maxMessagesEachReceive; ++i) { + messageList.add(getMessage(messageId, isSessionEnabled)); + } + + sendMessages(messageList); + + // Act & Assert + IterableStream messages; + + int receivedMessageCount; + int totalReceivedCount = 0; + for (int i = 0; i < totalReceiver; ++i) { + messages = receiver.receive(maxMessagesEachReceive, shortTimeOut); + receivedMessageCount = 0; + for (ServiceBusReceivedMessageContext receivedMessage : messages) { + assertMessageEquals(receivedMessage, messageId, isSessionEnabled); + receiver.complete(receivedMessage.getMessage()); + messagesPending.decrementAndGet(); + ++receivedMessageCount; + } + assertEquals(maxMessagesEachReceive, receivedMessageCount); + totalReceivedCount += receivedMessageCount; + } + + assertEquals(totalReceiver * maxMessagesEachReceive, totalReceivedCount); + } + + /** + * Verifies that we can only call receive() multiple times. + */ + @MethodSource("messagingEntityWithSessions") + @ParameterizedTest + void parallelReceiveByOneSubscriber(MessagingEntityType entityType, boolean isSessionEnabled) { + // Arrange + setSenderAndReceiver(entityType, isSessionEnabled); + final int maxMessagesEachReceive = 3; + final int totalReceiver = 6; + final Duration shortTimeOut = Duration.ofSeconds(8); + + final String messageId = UUID.randomUUID().toString(); + final List messageList = new ArrayList<>(); + for (int i = 0; i < totalReceiver * maxMessagesEachReceive; ++i) { + messageList.add(getMessage(messageId, isSessionEnabled)); + } + + sendMessages(messageList); + + // Act & Assert + AtomicInteger totalReceivedMessages = new AtomicInteger(); + List receiverThreads = new ArrayList<>(); + for (int i = 0; i < totalReceiver; ++i) { + Thread thread = new Thread(() -> { + IterableStream messages1 = receiver. + receive(maxMessagesEachReceive, shortTimeOut); + int receivedMessageCount = 0; + long lastSequenceReceiver = 0; + for (ServiceBusReceivedMessageContext receivedMessage : messages1) { + assertMessageEquals(receivedMessage, messageId, isSessionEnabled); + receiver.complete(receivedMessage.getMessage()); + assertTrue(receivedMessage.getMessage().getSequenceNumber() > lastSequenceReceiver); + lastSequenceReceiver = receivedMessage.getMessage().getSequenceNumber(); + messagesPending.decrementAndGet(); + ++receivedMessageCount; + } + totalReceivedMessages.addAndGet(receivedMessageCount); + assertEquals(maxMessagesEachReceive, receivedMessageCount); + }); + receiverThreads.add(thread); + } + + receiverThreads.forEach(t -> t.start()); + + receiverThreads.forEach(t -> { + try { + t.join(); + } catch (InterruptedException e) { + fail("Error in receiving messages: " + e.getMessage()); + } + }); + assertEquals(totalReceiver * maxMessagesEachReceive, totalReceivedMessages.get()); + } + /** * Verifies that we can send and receive two messages. */ @@ -613,6 +720,11 @@ private void setSenderAndReceiver(MessagingEntityType entityType, boolean isSess .buildClient(); } } + private void sendMessages(List messageList) { + sender.send(messageList); + int number = messagesPending.getAndSet(messageList.size()); + logger.info("Number sent: {}", number); + } private void sendMessage(ServiceBusMessage message) { sender.send(message); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/SynchronousMessageSubscriberTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/SynchronousMessageSubscriberTest.java new file mode 100644 index 0000000000000..bd9019b956ba0 --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/SynchronousMessageSubscriberTest.java @@ -0,0 +1,131 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.servicebus; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.reactivestreams.Subscription; +import reactor.test.StepVerifier; + +import java.time.Duration; + +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.when; + +/** + * Unit test for sync subscriber. + */ +public class SynchronousMessageSubscriberTest { + + private static final int PREFETCH = 1; + + @Mock + private SynchronousReceiveWork work1; + + @Mock + private SynchronousReceiveWork work2; + + @Mock + private Subscription subscription; + + private SynchronousMessageSubscriber syncSybscriber; + + @BeforeAll + static void beforeAll() { + StepVerifier.setDefaultTimeout(Duration.ofSeconds(30)); + } + + @AfterAll + static void afterAll() { + StepVerifier.resetDefaultTimeout(); + } + + @BeforeEach + void setup() { + MockitoAnnotations.initMocks(this); + when(work1.getId()).thenReturn(1L); + } + + @AfterEach + void teardown() { + Mockito.framework().clearInlineMocks(); + } + + /** + * Test that if prefetch is large value, it will be the one requested. + */ + @Test + void workAddedAndLargePrefetch() { + // Arrange + when(work1.getId()).thenReturn(1L); + + // Act + syncSybscriber = new SynchronousMessageSubscriber(100, work1); + + // Assert + Assertions.assertEquals(1, syncSybscriber.getWorkQueueSize()); + Assertions.assertEquals(100, syncSybscriber.getRequested()); + + } + + /** + * Test that if prefetch is small value than work, larger value be requested. + */ + @Test + void workAddedInQueueOnCreation() { + // Arrange & Act + when(work1.getNumberOfEvents()).thenReturn(3); + syncSybscriber = new SynchronousMessageSubscriber(0, work1); + + // Assert + Assertions.assertEquals(1, syncSybscriber.getWorkQueueSize()); + Assertions.assertEquals(3, syncSybscriber.getRequested()); + + } + + /** + * A work get queued in work queue. + */ + @Test + void queueWorkTest() { + // Arrange + syncSybscriber = new SynchronousMessageSubscriber(PREFETCH, work1); + + // Act + syncSybscriber.queueWork(work2); + + // Assert + Assertions.assertEquals(2, syncSybscriber.getWorkQueueSize()); + Assertions.assertEquals(1, syncSybscriber.getRequested()); + + } + + /** + * When we call hookOnSubscribe, the sync subscriber is initialised. + */ + @Test + void hookOnSubscribeTest() { + // Arrange + syncSybscriber = new SynchronousMessageSubscriber(PREFETCH, work1); + when(work1.getTimeout()).thenReturn(Duration.ofSeconds(10)); + when(work1.isTerminal()).thenReturn(true); + doNothing().when(subscription).request(1); + + // Act + syncSybscriber.hookOnSubscribe(subscription); + + // Assert + Assertions.assertTrue(syncSybscriber.isSubscriberInitialized()); + Assertions.assertEquals(0, syncSybscriber.getWorkQueueSize()); + Assertions.assertEquals(1, syncSybscriber.getRequested()); + + } +}