Skip to content

Commit

Permalink
SDK version 4.2.0 (Hotfix on 4.1.3): Enable a way to Unregister Messa…
Browse files Browse the repository at this point in the history
…ge Handler and Session Handler (#14021) (#15151)

* Enable a way to Unregister Message Handler and Session Handler (#14021)

* add UnregisterMessageHandler method

* Update sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs

Co-authored-by: Sean Feldman <SeanFeldman@users.noreply.github.com>

* Update sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs

Co-authored-by: Sean Feldman <SeanFeldman@users.noreply.github.com>

* Update the unregister method to be async and await for inflight operations to finish

* Update sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs

Co-authored-by: Sean Feldman <SeanFeldman@users.noreply.github.com>

* Update sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs

Co-authored-by: Sean Feldman <SeanFeldman@users.noreply.github.com>

* Update sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs

Co-authored-by: Sean Feldman <SeanFeldman@users.noreply.github.com>

* Change name to have async suffix and add to existing onMessageQueueTests

* Add UnregisterSessionHandlerAsync and corresponding tests

* nit

* nit

* Add a new cancellation type to not cancel inflight message handling operations when unregister is called.

* Add another type of cancellation token to session handler path

* nit

* Add a timeout parameter to unregister functions and add according unit tests

* nit

* cancel runningTaskCancellationTokenSource after unregister is done

* change public API

* update the API header

* update the API definition

* fix spacing

* fix ApproveAzureServiceBus CIT test

Co-authored-by: Sean Feldman <SeanFeldman@users.noreply.github.com>

* add changelog for nuget 4.2.0

Co-authored-by: Sean Feldman <SeanFeldman@users.noreply.github.com>
  • Loading branch information
DorothySun216 and SeanFeldman authored Sep 15, 2020
1 parent 4cb7cb7 commit dc0fda4
Show file tree
Hide file tree
Showing 19 changed files with 638 additions and 17 deletions.
3 changes: 3 additions & 0 deletions sdk/servicebus/Microsoft.Azure.ServiceBus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# Release History
## 4.2.0
### Improvements
- Enable a way to Unregister Message Handler and Session Handler [PR 14021](https://github.com/Azure/azure-sdk-for-net/pull/14021)

## 4.1.3 (2020-04-17)
- Add `GetQueuesRuntimeInfoAsync`, `GetTopicsRuntimeInfoAsync` and `GetSubscriptionsRuntimeInfoAsync` to `ManagementClient` to allow retrieval of batched entity runtime information. [PR 10261](https://github.com/Azure/azure-sdk-for-net/pull/10261)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ public interface IReceiverClient : IClientEntity
/// <remarks>Enable prefetch to speed up the receive rate.</remarks>
void RegisterMessageHandler(Func<Message, CancellationToken, Task> handler, MessageHandlerOptions messageHandlerOptions);

/// <summary>
/// Unregister message handler from the receiver if there is an active message handler registered. This operation waits for the completion
/// of inflight receive and message handling operations to finish and unregisters future receives on the message handler which previously
/// registered.
/// <param name="inflightMessageHandlerTasksWaitTimeout"> is the waitTimeout for inflight message handling tasks.
/// </summary>
Task UnregisterMessageHandlerAsync(TimeSpan inflightMessageHandlerTasksWaitTimeout);

/// <summary>
/// Completes a <see cref="Message"/> using its lock token. This will delete the message from the queue.
/// </summary>
Expand Down Expand Up @@ -115,4 +123,4 @@ public interface IReceiverClient : IClientEntity
/// </remarks>
Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ public class MessageReceiver : ClientEntity, IMessageReceiver
int prefetchCount;
long lastPeekedSequenceNumber;
MessageReceivePump receivePump;
// Cancellation token to cancel the message pump. Once this is fired, all future message handling operations registered by application will be
// cancelled.
CancellationTokenSource receivePumpCancellationTokenSource;
// Cancellation token to cancel the inflight message handling operations registered by application in the message pump.
CancellationTokenSource runningTaskCancellationTokenSource;

/// <summary>
/// Creates a new MessageReceiver from a <see cref="ServiceBusConnectionStringBuilder"/>.
Expand Down Expand Up @@ -899,6 +903,51 @@ public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handle
this.OnMessageHandler(messageHandlerOptions, handler);
}

/// <summary>
/// Unregister message handler from the receiver if there is an active message handler registered. This operation waits for the completion
/// of inflight receive and message handling operations to finish and unregisters future receives on the message handler which previously
/// registered.
/// <param name="inflightMessageHandlerTasksWaitTimeout"> is the waitTimeout for inflight message handling tasks.
/// </summary>
public async Task UnregisterMessageHandlerAsync(TimeSpan inflightMessageHandlerTasksWaitTimeout)
{
this.ThrowIfClosed();

if (inflightMessageHandlerTasksWaitTimeout <= TimeSpan.Zero)
{
throw Fx.Exception.ArgumentOutOfRange(nameof(inflightMessageHandlerTasksWaitTimeout), inflightMessageHandlerTasksWaitTimeout, Resources.TimeoutMustBePositiveNonZero.FormatForUser(nameof(inflightMessageHandlerTasksWaitTimeout), inflightMessageHandlerTasksWaitTimeout));
}

MessagingEventSource.Log.UnregisterMessageHandlerStart(this.ClientId);
lock (this.messageReceivePumpSyncLock)
{
if (this.receivePump == null || this.receivePumpCancellationTokenSource.IsCancellationRequested)
{
// Silently return if handler has already been unregistered.
return;
}

this.receivePumpCancellationTokenSource.Cancel();
this.receivePumpCancellationTokenSource.Dispose();
}

Stopwatch stopWatch = Stopwatch.StartNew();
while (this.receivePump != null
&& stopWatch.Elapsed < inflightMessageHandlerTasksWaitTimeout
&& this.receivePump.maxConcurrentCallsSemaphoreSlim.CurrentCount < this.receivePump.registerHandlerOptions.MaxConcurrentCalls)
{
await Task.Delay(10).ConfigureAwait(false);
}

lock (this.messageReceivePumpSyncLock)
{
this.runningTaskCancellationTokenSource.Cancel();
this.runningTaskCancellationTokenSource.Dispose();
this.receivePump = null;
}
MessagingEventSource.Log.UnregisterMessageHandlerStop(this.ClientId);
}

/// <summary>
/// Registers a <see cref="ServiceBusPlugin"/> to be used with this receiver.
/// </summary>
Expand Down Expand Up @@ -1003,6 +1052,9 @@ protected override async Task OnClosingAsync()
{
this.receivePumpCancellationTokenSource.Cancel();
this.receivePumpCancellationTokenSource.Dispose();
// For back-compatibility
this.runningTaskCancellationTokenSource.Cancel();
this.runningTaskCancellationTokenSource.Dispose();
this.receivePump = null;
}
}
Expand Down Expand Up @@ -1279,7 +1331,13 @@ protected virtual void OnMessageHandler(
}

this.receivePumpCancellationTokenSource = new CancellationTokenSource();
this.receivePump = new MessageReceivePump(this, registerHandlerOptions, callback, this.ServiceBusConnection.Endpoint, this.receivePumpCancellationTokenSource.Token);

if (this.runningTaskCancellationTokenSource == null)
{
this.runningTaskCancellationTokenSource = new CancellationTokenSource();
}

this.receivePump = new MessageReceivePump(this, registerHandlerOptions, callback, this.ServiceBusConnection.Endpoint, this.receivePumpCancellationTokenSource.Token, this.runningTaskCancellationTokenSource.Token);
}

try
Expand All @@ -1295,6 +1353,8 @@ protected virtual void OnMessageHandler(
{
this.receivePumpCancellationTokenSource.Cancel();
this.receivePumpCancellationTokenSource.Dispose();
this.runningTaskCancellationTokenSource.Cancel();
this.runningTaskCancellationTokenSource.Dispose();
this.receivePump = null;
}
}
Expand Down
8 changes: 8 additions & 0 deletions sdk/servicebus/Microsoft.Azure.ServiceBus/src/IQueueClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,13 @@ public interface IQueueClient : IReceiverClient, ISenderClient
/// <param name="sessionHandlerOptions">Options used to configure the settings of the session pump.</param>
/// <remarks>Enable prefetch to speed up the receive rate. </remarks>
void RegisterSessionHandler(Func<IMessageSession, Message, CancellationToken, Task> handler, SessionHandlerOptions sessionHandlerOptions);

/// <summary>
/// Unregister session handler from the receiver if there is an active session handler registered. This operation waits for the completion
/// of inflight receive and session handling operations to finish and unregisters future receives on the session handler which previously
/// registered.
/// <param name="inflightSessionHandlerTasksWaitTimeout"> is the waitTimeout for inflight session handling tasks.
/// </summary>
Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,5 +114,13 @@ public interface ISubscriptionClient : IReceiverClient
/// <param name="sessionHandlerOptions">Options used to configure the settings of the session pump.</param>
/// <remarks>Enable prefetch to speed up the receive rate. </remarks>
void RegisterSessionHandler(Func<IMessageSession, Message, CancellationToken, Task> handler, SessionHandlerOptions sessionHandlerOptions);

/// <summary>
/// Unregister session handler from the receiver if there is an active session handler registered. This operation waits for the completion
/// of inflight receive and session handling operations to finish and unregisters future receives on the session handler which previously
/// registered.
/// <param name="inflightSessionHandlerTasksWaitTimeout"> is the waitTimeout for inflight session handling tasks.
/// </summary>
Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,28 @@ namespace Microsoft.Azure.ServiceBus

sealed class MessageReceivePump
{
public readonly SemaphoreSlim maxConcurrentCallsSemaphoreSlim;
public readonly MessageHandlerOptions registerHandlerOptions;
readonly Func<Message, CancellationToken, Task> onMessageCallback;
readonly string endpoint;
readonly MessageHandlerOptions registerHandlerOptions;
readonly IMessageReceiver messageReceiver;
readonly CancellationToken pumpCancellationToken;
readonly SemaphoreSlim maxConcurrentCallsSemaphoreSlim;
readonly CancellationToken runningTaskCancellationToken;
readonly ServiceBusDiagnosticSource diagnosticSource;

public MessageReceivePump(IMessageReceiver messageReceiver,
MessageHandlerOptions registerHandlerOptions,
Func<Message, CancellationToken, Task> callback,
Uri endpoint,
CancellationToken pumpCancellationToken)
CancellationToken pumpCancellationToken,
CancellationToken runningTaskCancellationToken)
{
this.messageReceiver = messageReceiver ?? throw new ArgumentNullException(nameof(messageReceiver));
this.registerHandlerOptions = registerHandlerOptions;
this.onMessageCallback = callback;
this.endpoint = endpoint.Authority;
this.pumpCancellationToken = pumpCancellationToken;
this.runningTaskCancellationToken = runningTaskCancellationToken;
this.maxConcurrentCallsSemaphoreSlim = new SemaphoreSlim(this.registerHandlerOptions.MaxConcurrentCalls);
this.diagnosticSource = new ServiceBusDiagnosticSource(messageReceiver.Path, endpoint);
}
Expand Down Expand Up @@ -163,7 +166,7 @@ async Task MessageDispatchTask(Message message)
try
{
MessagingEventSource.Log.MessageReceiverPumpUserCallbackStart(this.messageReceiver.ClientId, message);
await this.onMessageCallback(message, this.pumpCancellationToken).ConfigureAwait(false);
await this.onMessageCallback(message, this.runningTaskCancellationToken).ConfigureAwait(false);

MessagingEventSource.Log.MessageReceiverPumpUserCallbackStop(this.messageReceiver.ClientId, message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1380,6 +1380,42 @@ public void ManagementSerializationException(string objectName, string details =
this.WriteEvent(117, objectName, details);
}
}

[Event(118, Level = EventLevel.Informational, Message = "{0}: Unregister MessageHandler start.")]
public void UnregisterMessageHandlerStart(string clientId)
{
if (this.IsEnabled())
{
this.WriteEvent(118, clientId);
}
}

[Event(119, Level = EventLevel.Informational, Message = "{0}: Unregister MessageHandler done.")]
public void UnregisterMessageHandlerStop(string clientId)
{
if (this.IsEnabled())
{
this.WriteEvent(119, clientId);
}
}

[Event(120, Level = EventLevel.Informational, Message = "{0}: Unregister SessionHandler start.")]
public void UnregisterSessionHandlerStart(string clientId)
{
if (this.IsEnabled())
{
this.WriteEvent(120, clientId);
}
}

[Event(121, Level = EventLevel.Informational, Message = "{0}: Unregister SessionHandler done.")]
public void UnregisterSessionHandlerStop(string clientId)
{
if (this.IsEnabled())
{
this.WriteEvent(121, clientId);
}
}
}

internal static class TraceHelper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<AssemblyTitle>Azure ServiceBus SDK</AssemblyTitle>
<Description>This is the next generation Azure Service Bus .NET Standard client library that focuses on queues &amp; topics. For more information about Service Bus, see https://azure.microsoft.com/en-us/services/service-bus/</Description>
<Version>4.1.3</Version>
<Version>4.2.0</Version>
<PackageTags>Microsoft;Azure;Service Bus;ServiceBus;.NET;AMQP;IoT;Queue;Topic</PackageTags>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
<TargetFrameworks>$(RequiredTargetFrameworks)</TargetFrameworks>
Expand Down
24 changes: 24 additions & 0 deletions sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,18 @@ public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handle
this.InnerReceiver.RegisterMessageHandler(handler, messageHandlerOptions);
}

/// <summary>
/// Unregister message handler from the receiver if there is an active message handler registered. This operation waits for the completion
/// of inflight receive and message handling operations to finish and unregisters future receives on the message handler which previously
/// registered.
/// <param name="inflightMessageHandlerTasksWaitTimeout"> is the waitTimeout for inflight message handling tasks.
/// </summary>
public async Task UnregisterMessageHandlerAsync(TimeSpan inflightMessageHandlerTasksWaitTimeout)
{
this.ThrowIfClosed();
await this.InnerReceiver.UnregisterMessageHandlerAsync(inflightMessageHandlerTasksWaitTimeout).ConfigureAwait(false);
}

/// <summary>
/// Receive session messages continuously from the queue. Registers a message handler and begins a new thread to receive session-messages.
/// This handler(<see cref="Func{IMessageSession, Message, CancellationToken, Task}"/>) is awaited on every time a new message is received by the queue client.
Expand Down Expand Up @@ -476,6 +488,18 @@ public void RegisterSessionHandler(Func<IMessageSession, Message, CancellationTo
this.SessionPumpHost.OnSessionHandler(handler, sessionHandlerOptions);
}

/// <summary>
/// Unregister session handler from the receiver if there is an active session handler registered. This operation waits for the completion
/// of inflight receive and session handling operations to finish and unregisters future receives on the session handler which previously
/// registered.
/// <param name="inflightSessionHandlerTasksWaitTimeout"> is the waitTimeout for inflight session handling tasks.
/// </summary>
public async Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout)
{
this.ThrowIfClosed();
await this.SessionPumpHost.UnregisterSessionHandlerAsync(inflightSessionHandlerTasksWaitTimeout).ConfigureAwait(false);
}

/// <summary>
/// Schedules a message to appear on Service Bus at a later time.
/// </summary>
Expand Down
54 changes: 53 additions & 1 deletion sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionPumpHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

namespace Microsoft.Azure.ServiceBus
{
using Microsoft.Azure.ServiceBus.Primitives;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -12,6 +14,7 @@ internal sealed class SessionPumpHost
readonly object syncLock;
SessionReceivePump sessionReceivePump;
CancellationTokenSource sessionPumpCancellationTokenSource;
CancellationTokenSource runningTaskCancellationTokenSource;
readonly Uri endpoint;

public SessionPumpHost(string clientId, ReceiveMode receiveMode, ISessionClient sessionClient, Uri endpoint)
Expand All @@ -35,6 +38,9 @@ public void Close()
{
this.sessionPumpCancellationTokenSource?.Cancel();
this.sessionPumpCancellationTokenSource?.Dispose();
// For back-compatibility
this.runningTaskCancellationTokenSource?.Cancel();
this.runningTaskCancellationTokenSource?.Dispose();
this.sessionReceivePump = null;
}
}
Expand All @@ -53,14 +59,22 @@ public void OnSessionHandler(
}

this.sessionPumpCancellationTokenSource = new CancellationTokenSource();

// Running task cancellation token source can be reused if previously UnregisterSessionHandlerAsync was called
if (this.runningTaskCancellationTokenSource == null)
{
this.runningTaskCancellationTokenSource = new CancellationTokenSource();
}

this.sessionReceivePump = new SessionReceivePump(
this.ClientId,
this.SessionClient,
this.ReceiveMode,
sessionHandlerOptions,
callback,
this.endpoint,
this.sessionPumpCancellationTokenSource.Token);
this.sessionPumpCancellationTokenSource.Token,
this.runningTaskCancellationTokenSource.Token);
}

try
Expand All @@ -82,5 +96,43 @@ public void OnSessionHandler(

MessagingEventSource.Log.RegisterOnSessionHandlerStop(this.ClientId);
}

public async Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout)
{
if (inflightSessionHandlerTasksWaitTimeout <= TimeSpan.Zero)
{
throw Fx.Exception.ArgumentOutOfRange(nameof(inflightSessionHandlerTasksWaitTimeout), inflightSessionHandlerTasksWaitTimeout, Resources.TimeoutMustBePositiveNonZero.FormatForUser(nameof(inflightSessionHandlerTasksWaitTimeout), inflightSessionHandlerTasksWaitTimeout));
}

MessagingEventSource.Log.UnregisterSessionHandlerStart(this.ClientId);
lock (this.syncLock)
{
if (this.sessionReceivePump == null || this.sessionPumpCancellationTokenSource.IsCancellationRequested)
{
// Silently return if handler has already been unregistered.
return;
}

this.sessionPumpCancellationTokenSource.Cancel();
this.sessionPumpCancellationTokenSource.Dispose();
}

Stopwatch stopWatch = Stopwatch.StartNew();
while (this.sessionReceivePump != null
&& stopWatch.Elapsed < inflightSessionHandlerTasksWaitTimeout
&& (this.sessionReceivePump.maxConcurrentSessionsSemaphoreSlim.CurrentCount <
this.sessionReceivePump.sessionHandlerOptions.MaxConcurrentSessions
|| this.sessionReceivePump.maxPendingAcceptSessionsSemaphoreSlim.CurrentCount <
this.sessionReceivePump.sessionHandlerOptions.MaxConcurrentAcceptSessionCalls))
{
await Task.Delay(10).ConfigureAwait(false);
}

lock (this.sessionPumpCancellationTokenSource)
{
this.sessionReceivePump = null;
}
MessagingEventSource.Log.UnregisterSessionHandlerStop(this.ClientId);
}
}
}
Loading

0 comments on commit dc0fda4

Please sign in to comment.