Skip to content

Commit

Permalink
Servicebus track2 sync queue up multiple receive calls (#10940)
Browse files Browse the repository at this point in the history
SyncReceiver: Queue up the receive request and process them one at a time
  • Loading branch information
hemanttanwar authored May 22, 2020
1 parent c3f7e58 commit ab9eb78
Show file tree
Hide file tree
Showing 5 changed files with 514 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.models.DeadLetterOptions;
import com.azure.messaging.servicebus.models.ReceiveMode;
import reactor.core.publisher.EmitterProcessor;
import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusReceiverClientBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

Expand All @@ -35,10 +35,9 @@ public final class ServiceBusReceiverClient implements AutoCloseable {
private final AtomicInteger idGenerator = new AtomicInteger();
private final ServiceBusReceiverAsyncClient asyncClient;
private final Duration operationTimeout;
private final Object lock = new Object();

private final AtomicReference<EmitterProcessor<ServiceBusReceivedMessageContext>> messageProcessor =
new AtomicReference<>();
/* To hold each receive work item to be processed.*/
private final AtomicReference<SynchronousMessageSubscriber> synchronousMessageSubscriber = new AtomicReference<>();

/**
* Creates a synchronous receiver given its asynchronous counterpart.
Expand Down Expand Up @@ -467,15 +466,18 @@ public IterableStream<ServiceBusReceivedMessageContext> receive(int maxMessages)
}

/**
* Receives an iterable stream of {@link ServiceBusReceivedMessage messages} from the Service Bus entity.
* Receives an iterable stream of {@link ServiceBusReceivedMessage messages} from the Service Bus entity. The
* default receive mode is {@link ReceiveMode#PEEK_LOCK } unless it is changed during creation of
* {@link ServiceBusReceiverClient} using {@link ServiceBusReceiverClientBuilder#receiveMode(ReceiveMode)}.
*
* @param maxMessages The maximum number of messages to receive.
* @param maxWaitTime The time the client waits for receiving a message before it times out.
* @return An {@link IterableStream} of at most {@code maxMessages} messages from the Service Bus entity.
*
* @throws IllegalArgumentException if {@code maxMessages} or {@code maxWaitTime} is zero or a negative value.
*/
public IterableStream<ServiceBusReceivedMessageContext> receive(int maxMessages, Duration maxWaitTime) {
public IterableStream<ServiceBusReceivedMessageContext> receive(int maxMessages,
Duration maxWaitTime) {
if (maxMessages <= 0) {
throw logger.logExceptionAsError(new IllegalArgumentException(
"'maxMessages' cannot be less than or equal to 0. maxMessages: " + maxMessages));
Expand Down Expand Up @@ -609,9 +611,9 @@ public void setSessionState(String sessionId, byte[] sessionState) {
public void close() {
asyncClient.close();

EmitterProcessor<ServiceBusReceivedMessageContext> processor = messageProcessor.getAndSet(null);
if (processor != null) {
processor.onComplete();
SynchronousMessageSubscriber messageSubscriber = synchronousMessageSubscriber.getAndSet(null);
if (messageSubscriber != null && !messageSubscriber.isDisposed()) {
messageSubscriber.dispose();
}
}

Expand All @@ -621,22 +623,24 @@ public void close() {
*/
private void queueWork(int maximumMessageCount, Duration maxWaitTime,
FluxSink<ServiceBusReceivedMessageContext> emitter) {
synchronized (lock) {
final long id = idGenerator.getAndIncrement();
EmitterProcessor<ServiceBusReceivedMessageContext> emitterProcessor = messageProcessor.get();

final SynchronousReceiveWork work = new SynchronousReceiveWork(id, maximumMessageCount, maxWaitTime,
emitter);
final SynchronousMessageSubscriber syncSubscriber = new SynchronousMessageSubscriber(work);
logger.info("[{}]: Started synchronous message subscriber.", id);

if (emitterProcessor == null) {
emitterProcessor = this.asyncClient.receive()
.subscribeWith(EmitterProcessor.create(asyncClient.getReceiverOptions().getPrefetchCount(), false));
messageProcessor.set(emitterProcessor);
final long id = idGenerator.getAndIncrement();
final SynchronousReceiveWork work = new SynchronousReceiveWork(id, maximumMessageCount, maxWaitTime, emitter);

SynchronousMessageSubscriber messageSubscriber = synchronousMessageSubscriber.get();
if (messageSubscriber == null) {
long prefetch = asyncClient.getReceiverOptions().getPrefetchCount();
SynchronousMessageSubscriber newSubscriber = new SynchronousMessageSubscriber(prefetch, work);

if (!synchronousMessageSubscriber.compareAndSet(null, newSubscriber)) {
newSubscriber.dispose();
SynchronousMessageSubscriber existing = synchronousMessageSubscriber.get();
existing.queueWork(work);
} else {
asyncClient.receive().subscribeWith(newSubscriber);
}

emitterProcessor.subscribe(syncSubscriber);
} else {
messageSubscriber.queueWork(work);
}
logger.verbose("[{}] Receive request queued up.", work.getId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,114 +5,250 @@

import com.azure.core.util.logging.ClientLogger;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;

import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

/**
* Subscriber that listens to events and publishes them downstream and publishes events to them in the order received.
*/
class SynchronousMessageSubscriber extends BaseSubscriber<ServiceBusReceivedMessageContext> {
private final ClientLogger logger = new ClientLogger(SynchronousMessageSubscriber.class);
private final Timer timer = new Timer();
private final AtomicBoolean isDisposed = new AtomicBoolean();
private final SynchronousReceiveWork work;
private final AtomicInteger wip = new AtomicInteger();
private final Queue<SynchronousReceiveWork> workQueue = new ConcurrentLinkedQueue<>();
private final Queue<ServiceBusReceivedMessageContext> bufferMessages = new ConcurrentLinkedQueue<>();
private final AtomicLong remaining = new AtomicLong();

private final long requested;
private final Object currentWorkLock = new Object();

private Disposable currentTimeoutOperation;
private SynchronousReceiveWork currentWork;
private boolean subscriberInitialized;

private volatile Subscription subscription;

SynchronousMessageSubscriber(SynchronousReceiveWork work) {
this.work = Objects.requireNonNull(work, "'work' cannot be null.");
private static final AtomicReferenceFieldUpdater<SynchronousMessageSubscriber, Subscription> UPSTREAM =
AtomicReferenceFieldUpdater.newUpdater(SynchronousMessageSubscriber.class, Subscription.class,
"subscription");


SynchronousMessageSubscriber(long prefetch, SynchronousReceiveWork initialWork) {
this.workQueue.add(initialWork);
requested = initialWork.getNumberOfEvents() > prefetch ? initialWork.getNumberOfEvents() : prefetch;
}

/**
* On an initial subscription, will take the first work item, and request that amount of work for it.
*
* @param subscription Subscription for upstream.
*/
@Override
protected void hookOnSubscribe(Subscription subscription) {
if (this.subscription == null) {

if (Operators.setOnce(UPSTREAM, this, subscription)) {
this.subscription = subscription;
remaining.addAndGet(requested);
subscription.request(requested);
subscriberInitialized = true;
drain();
} else {
logger.error("Already subscribed once.");
}
}

/**
* Publishes the event to the current {@link SynchronousReceiveWork}. If that work item is complete, will dispose of
* the subscriber.
* @param message Event to publish.
*/
@Override
protected void hookOnNext(ServiceBusReceivedMessageContext message) {
bufferMessages.add(message);
drain();
}

/**
* Queue the work to be picked up by drain loop.
* @param work to be queued.
*/
void queueWork(SynchronousReceiveWork work) {

logger.info("[{}] Pending: {}, Scheduling receive timeout task '{}'.", work.getId(), work.getNumberOfEvents(),
work.getTimeout());
workQueue.add(work);

subscription.request(work.getNumberOfEvents());

timer.schedule(new ReceiveTimeoutTask(work.getId(), this::dispose), work.getTimeout().toMillis());
// Do not drain if another thread want to queue the work before we have subscriber
if (subscriberInitialized) {
drain();
}
}

/**
* Publishes the event to the current {@link SynchronousReceiveWork}. If that work item is complete, will dispose of
* the subscriber.
*
* @param value Event to publish.
* Drain the work, only one thread can be in this loop at a time.
*/
@Override
protected void hookOnNext(ServiceBusReceivedMessageContext value) {
work.next(value);
private void drain() {
// If someone is already in this loop, then we are already clearing the queue.
if (!wip.compareAndSet(0, 1)) {
return;
}

if (work.isTerminal()) {
logger.info("[{}] Completed. Closing Flux and cancelling subscription.", work.getId());
dispose();
try {
drainQueue();
} finally {
final int decremented = wip.decrementAndGet();
if (decremented != 0) {
logger.warning("There should be 0, but was: {}", decremented);
}
}
}

@Override
protected void hookOnComplete() {
logger.info("[{}] Completed. No events to listen to.", work.getId());
dispose();
/***
* Drain the queue using a lock on current work in progress.
*/
private void drainQueue() {
if (isTerminated()) {
return;
}

// Acquiring the lock
synchronized (currentWorkLock) {

// Making sure current work not become terminal since last drain queue cycle
if (currentWork != null && currentWork.isTerminal()) {
workQueue.remove(currentWork);
if (currentTimeoutOperation != null && !currentTimeoutOperation.isDisposed()) {
currentTimeoutOperation.dispose();
}
currentTimeoutOperation = null;
}

// We should process a work when
// 1. it is first time getting picked up
// 2. or more messages have arrived while we were in drain loop.
// We might not have all the message in bufferMessages needed for workQueue, Thus we will only remove work
// from queue when we have delivered all the messages to currentWork.

while ((currentWork = workQueue.peek()) != null
&& (!currentWork.isProcessingStarted() || bufferMessages.size() > 0)) {

// Additional check for safety, but normally this work should never be terminal
if (currentWork.isTerminal()) {
// This work already finished by either timeout or no more messages to send, process next work.
workQueue.remove(currentWork);
if (currentTimeoutOperation != null && !currentTimeoutOperation.isDisposed()) {
currentTimeoutOperation.dispose();
}
continue;
}

if (!currentWork.isProcessingStarted()) {
// timer to complete the currentWork in case of timeout trigger
currentTimeoutOperation = getTimeoutOperation(currentWork);
currentWork.startedProcessing();
}

// Send messages to currentWork from buffer
while (bufferMessages.size() > 0 && !currentWork.isTerminal()) {
currentWork.next(bufferMessages.poll());
remaining.decrementAndGet();
}

// if we have delivered all the messages to currentWork, we will complete it.
if (currentWork.isTerminal()) {
if (currentWork.getError() == null) {
currentWork.complete();
}
// Now remove from queue since it is complete
workQueue.remove(currentWork);
if (currentTimeoutOperation != null && !currentTimeoutOperation.isDisposed()) {
currentTimeoutOperation.dispose();
}
logger.verbose("The work [{}] is complete.", currentWork.getId());
} else {
// Since this work is not complete, find out how much we should request from upstream
long creditToAdd = currentWork.getRemaining() - (remaining.get() + bufferMessages.size());
if (creditToAdd > 0) {
remaining.addAndGet(creditToAdd);
subscription.request(creditToAdd);
logger.verbose("Requesting [{}] from upstream for work [{}].", creditToAdd,
currentWork.getId());
}
}
}
}
}

/**
* @param work on which timeout thread need to start.
*
* @return {@link Disposable} for the timeout operation.
*/
private Disposable getTimeoutOperation(SynchronousReceiveWork work) {
Duration timeout = work.getTimeout();
return Mono.delay(timeout).thenReturn(work)
.subscribe(l -> {
synchronized (currentWorkLock) {
if (currentWork == work) {
work.timeout();
}
}
});
}

/**
* {@inheritDoc}
*/
@Override
protected void hookOnError(Throwable throwable) {
logger.error("[{}] Errors occurred upstream", work.getId(), throwable);
work.error(throwable);
logger.error("[{}] Errors occurred upstream", currentWork.getId(), throwable);
synchronized (currentWorkLock) {
currentWork.error(throwable);
}
dispose();
}

@Override
protected void hookOnCancel() {
dispose();
}

/**
* {@inheritDoc}
*/
@Override
public void dispose() {
if (isDisposed.getAndSet(true)) {
return;
}

work.complete();
synchronized (currentWorkLock) {
if (currentWork != null) {
currentWork.complete();
}
if (currentTimeoutOperation != null && !currentTimeoutOperation.isDisposed()) {
currentTimeoutOperation.dispose();
}
currentTimeoutOperation = null;
}

subscription.cancel();
timer.cancel();
super.dispose();
}

private static final class ReceiveTimeoutTask extends TimerTask {
private final ClientLogger logger = new ClientLogger(ReceiveTimeoutTask.class);
private final long workId;
private final Runnable onDispose;
private boolean isTerminated() {
return isDisposed.get();
}

ReceiveTimeoutTask(long workId, Runnable onDispose) {
this.workId = workId;
this.onDispose = onDispose;
}
int getWorkQueueSize() {
return this.workQueue.size();
}

@Override
public void run() {
logger.info("[{}] Timeout encountered, disposing of subscriber.", workId);
onDispose.run();
}
long getRequested() {
return this.requested;
}
}

boolean isSubscriberInitialized() {
return this.subscriberInitialized;
}
}
Loading

0 comments on commit ab9eb78

Please sign in to comment.