diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs index 7116ce895fbaf..85a374f3b0814 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs @@ -63,6 +63,14 @@ public interface IReceiverClient : IClientEntity /// Enable prefetch to speed up the receive rate. void RegisterMessageHandler(Func handler, MessageHandlerOptions messageHandlerOptions); + /// + /// Unregister message handler from the receiver if there is an active message handler registered. This operation waits for the completion + /// of inflight receive and message handling operations to finish and unregisters future receives on the message handler which previously + /// registered. + /// is the waitTimeout for inflight message handling tasks. + /// + Task UnregisterMessageHandlerAsync(TimeSpan inflightMessageHandlerTasksWaitTimeout); + /// /// Completes a using its lock token. This will delete the message from the queue. /// @@ -115,4 +123,4 @@ public interface IReceiverClient : IClientEntity /// Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null); } -} \ No newline at end of file +} diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs index a55266d092c40..52b8e10a0b390 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs @@ -54,7 +54,11 @@ public class MessageReceiver : ClientEntity, IMessageReceiver int prefetchCount; long lastPeekedSequenceNumber; MessageReceivePump receivePump; + // Cancellation token to cancel the message pump. Once this is fired, all future message handling operations registered by application will be + // cancelled. CancellationTokenSource receivePumpCancellationTokenSource; + // Cancellation token to cancel the inflight message handling operations registered by application in the message pump. + CancellationTokenSource runningTaskCancellationTokenSource; /// /// Creates a new MessageReceiver from a . @@ -899,6 +903,51 @@ public void RegisterMessageHandler(Func handle this.OnMessageHandler(messageHandlerOptions, handler); } + /// + /// Unregister message handler from the receiver if there is an active message handler registered. This operation waits for the completion + /// of inflight receive and message handling operations to finish and unregisters future receives on the message handler which previously + /// registered. + /// is the waitTimeout for inflight message handling tasks. + /// + public async Task UnregisterMessageHandlerAsync(TimeSpan inflightMessageHandlerTasksWaitTimeout) + { + this.ThrowIfClosed(); + + if (inflightMessageHandlerTasksWaitTimeout <= TimeSpan.Zero) + { + throw Fx.Exception.ArgumentOutOfRange(nameof(inflightMessageHandlerTasksWaitTimeout), inflightMessageHandlerTasksWaitTimeout, Resources.TimeoutMustBePositiveNonZero.FormatForUser(nameof(inflightMessageHandlerTasksWaitTimeout), inflightMessageHandlerTasksWaitTimeout)); + } + + MessagingEventSource.Log.UnregisterMessageHandlerStart(this.ClientId); + lock (this.messageReceivePumpSyncLock) + { + if (this.receivePump == null || this.receivePumpCancellationTokenSource.IsCancellationRequested) + { + // Silently return if handler has already been unregistered. + return; + } + + this.receivePumpCancellationTokenSource.Cancel(); + this.receivePumpCancellationTokenSource.Dispose(); + } + + Stopwatch stopWatch = Stopwatch.StartNew(); + while (this.receivePump != null + && stopWatch.Elapsed < inflightMessageHandlerTasksWaitTimeout + && this.receivePump.maxConcurrentCallsSemaphoreSlim.CurrentCount < this.receivePump.registerHandlerOptions.MaxConcurrentCalls) + { + await Task.Delay(10).ConfigureAwait(false); + } + + lock (this.messageReceivePumpSyncLock) + { + this.runningTaskCancellationTokenSource.Cancel(); + this.runningTaskCancellationTokenSource.Dispose(); + this.receivePump = null; + } + MessagingEventSource.Log.UnregisterMessageHandlerStop(this.ClientId); + } + /// /// Registers a to be used with this receiver. /// @@ -1003,6 +1052,9 @@ protected override async Task OnClosingAsync() { this.receivePumpCancellationTokenSource.Cancel(); this.receivePumpCancellationTokenSource.Dispose(); + // For back-compatibility + this.runningTaskCancellationTokenSource.Cancel(); + this.runningTaskCancellationTokenSource.Dispose(); this.receivePump = null; } } @@ -1279,7 +1331,13 @@ protected virtual void OnMessageHandler( } this.receivePumpCancellationTokenSource = new CancellationTokenSource(); - this.receivePump = new MessageReceivePump(this, registerHandlerOptions, callback, this.ServiceBusConnection.Endpoint, this.receivePumpCancellationTokenSource.Token); + + if (this.runningTaskCancellationTokenSource == null) + { + this.runningTaskCancellationTokenSource = new CancellationTokenSource(); + } + + this.receivePump = new MessageReceivePump(this, registerHandlerOptions, callback, this.ServiceBusConnection.Endpoint, this.receivePumpCancellationTokenSource.Token, this.runningTaskCancellationTokenSource.Token); } try @@ -1295,6 +1353,8 @@ protected virtual void OnMessageHandler( { this.receivePumpCancellationTokenSource.Cancel(); this.receivePumpCancellationTokenSource.Dispose(); + this.runningTaskCancellationTokenSource.Cancel(); + this.runningTaskCancellationTokenSource.Dispose(); this.receivePump = null; } } diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/IQueueClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/IQueueClient.cs index f45ecbd4f8c1c..8ce76dcb0c0bf 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/IQueueClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/IQueueClient.cs @@ -77,5 +77,13 @@ public interface IQueueClient : IReceiverClient, ISenderClient /// Options used to configure the settings of the session pump. /// Enable prefetch to speed up the receive rate. void RegisterSessionHandler(Func handler, SessionHandlerOptions sessionHandlerOptions); + + /// + /// Unregister session handler from the receiver if there is an active session handler registered. This operation waits for the completion + /// of inflight receive and session handling operations to finish and unregisters future receives on the session handler which previously + /// registered. + /// is the waitTimeout for inflight session handling tasks. + /// + Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout); } } \ No newline at end of file diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/ISubscriptionClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/ISubscriptionClient.cs index 0e12e751e958d..e8e19eed33762 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/ISubscriptionClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/ISubscriptionClient.cs @@ -114,5 +114,13 @@ public interface ISubscriptionClient : IReceiverClient /// Options used to configure the settings of the session pump. /// Enable prefetch to speed up the receive rate. void RegisterSessionHandler(Func handler, SessionHandlerOptions sessionHandlerOptions); + + /// + /// Unregister session handler from the receiver if there is an active session handler registered. This operation waits for the completion + /// of inflight receive and session handling operations to finish and unregisters future receives on the session handler which previously + /// registered. + /// is the waitTimeout for inflight session handling tasks. + /// + Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout); } } \ No newline at end of file diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/MessageReceivePump.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/MessageReceivePump.cs index f918b32e146d9..89750fd338c84 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/MessageReceivePump.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/MessageReceivePump.cs @@ -12,25 +12,28 @@ namespace Microsoft.Azure.ServiceBus sealed class MessageReceivePump { + public readonly SemaphoreSlim maxConcurrentCallsSemaphoreSlim; + public readonly MessageHandlerOptions registerHandlerOptions; readonly Func onMessageCallback; readonly string endpoint; - readonly MessageHandlerOptions registerHandlerOptions; readonly IMessageReceiver messageReceiver; readonly CancellationToken pumpCancellationToken; - readonly SemaphoreSlim maxConcurrentCallsSemaphoreSlim; + readonly CancellationToken runningTaskCancellationToken; readonly ServiceBusDiagnosticSource diagnosticSource; public MessageReceivePump(IMessageReceiver messageReceiver, MessageHandlerOptions registerHandlerOptions, Func callback, Uri endpoint, - CancellationToken pumpCancellationToken) + CancellationToken pumpCancellationToken, + CancellationToken runningTaskCancellationToken) { this.messageReceiver = messageReceiver ?? throw new ArgumentNullException(nameof(messageReceiver)); this.registerHandlerOptions = registerHandlerOptions; this.onMessageCallback = callback; this.endpoint = endpoint.Authority; this.pumpCancellationToken = pumpCancellationToken; + this.runningTaskCancellationToken = runningTaskCancellationToken; this.maxConcurrentCallsSemaphoreSlim = new SemaphoreSlim(this.registerHandlerOptions.MaxConcurrentCalls); this.diagnosticSource = new ServiceBusDiagnosticSource(messageReceiver.Path, endpoint); } @@ -163,7 +166,7 @@ async Task MessageDispatchTask(Message message) try { MessagingEventSource.Log.MessageReceiverPumpUserCallbackStart(this.messageReceiver.ClientId, message); - await this.onMessageCallback(message, this.pumpCancellationToken).ConfigureAwait(false); + await this.onMessageCallback(message, this.runningTaskCancellationToken).ConfigureAwait(false); MessagingEventSource.Log.MessageReceiverPumpUserCallbackStop(this.messageReceiver.ClientId, message); } diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/MessagingEventSource.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/MessagingEventSource.cs index b647b1b8b04e3..a90e6a9421852 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/MessagingEventSource.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/MessagingEventSource.cs @@ -1380,6 +1380,42 @@ public void ManagementSerializationException(string objectName, string details = this.WriteEvent(117, objectName, details); } } + + [Event(118, Level = EventLevel.Informational, Message = "{0}: Unregister MessageHandler start.")] + public void UnregisterMessageHandlerStart(string clientId) + { + if (this.IsEnabled()) + { + this.WriteEvent(118, clientId); + } + } + + [Event(119, Level = EventLevel.Informational, Message = "{0}: Unregister MessageHandler done.")] + public void UnregisterMessageHandlerStop(string clientId) + { + if (this.IsEnabled()) + { + this.WriteEvent(119, clientId); + } + } + + [Event(120, Level = EventLevel.Informational, Message = "{0}: Unregister SessionHandler start.")] + public void UnregisterSessionHandlerStart(string clientId) + { + if (this.IsEnabled()) + { + this.WriteEvent(120, clientId); + } + } + + [Event(121, Level = EventLevel.Informational, Message = "{0}: Unregister SessionHandler done.")] + public void UnregisterSessionHandlerStop(string clientId) + { + if (this.IsEnabled()) + { + this.WriteEvent(121, clientId); + } + } } internal static class TraceHelper diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs index 554ac1c9544dc..36c7e6b01bcd1 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs @@ -446,6 +446,18 @@ public void RegisterMessageHandler(Func handle this.InnerReceiver.RegisterMessageHandler(handler, messageHandlerOptions); } + /// + /// Unregister message handler from the receiver if there is an active message handler registered. This operation waits for the completion + /// of inflight receive and message handling operations to finish and unregisters future receives on the message handler which previously + /// registered. + /// is the waitTimeout for inflight message handling tasks. + /// + public async Task UnregisterMessageHandlerAsync(TimeSpan inflightMessageHandlerTasksWaitTimeout) + { + this.ThrowIfClosed(); + await this.InnerReceiver.UnregisterMessageHandlerAsync(inflightMessageHandlerTasksWaitTimeout).ConfigureAwait(false); + } + /// /// Receive session messages continuously from the queue. Registers a message handler and begins a new thread to receive session-messages. /// This handler() is awaited on every time a new message is received by the queue client. @@ -476,6 +488,18 @@ public void RegisterSessionHandler(Func + /// Unregister session handler from the receiver if there is an active session handler registered. This operation waits for the completion + /// of inflight receive and session handling operations to finish and unregisters future receives on the session handler which previously + /// registered. + /// is the waitTimeout for inflight session handling tasks. + /// + public async Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout) + { + this.ThrowIfClosed(); + await this.SessionPumpHost.UnregisterSessionHandlerAsync(inflightSessionHandlerTasksWaitTimeout).ConfigureAwait(false); + } + /// /// Schedules a message to appear on Service Bus at a later time. /// diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionPumpHost.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionPumpHost.cs index b3d6b8ab67c02..6f684f37b6f15 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionPumpHost.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionPumpHost.cs @@ -3,7 +3,9 @@ namespace Microsoft.Azure.ServiceBus { + using Microsoft.Azure.ServiceBus.Primitives; using System; + using System.Diagnostics; using System.Threading; using System.Threading.Tasks; @@ -12,6 +14,7 @@ internal sealed class SessionPumpHost readonly object syncLock; SessionReceivePump sessionReceivePump; CancellationTokenSource sessionPumpCancellationTokenSource; + CancellationTokenSource runningTaskCancellationTokenSource; readonly Uri endpoint; public SessionPumpHost(string clientId, ReceiveMode receiveMode, ISessionClient sessionClient, Uri endpoint) @@ -35,6 +38,9 @@ public void Close() { this.sessionPumpCancellationTokenSource?.Cancel(); this.sessionPumpCancellationTokenSource?.Dispose(); + // For back-compatibility + this.runningTaskCancellationTokenSource?.Cancel(); + this.runningTaskCancellationTokenSource?.Dispose(); this.sessionReceivePump = null; } } @@ -53,6 +59,13 @@ public void OnSessionHandler( } this.sessionPumpCancellationTokenSource = new CancellationTokenSource(); + + // Running task cancellation token source can be reused if previously UnregisterSessionHandlerAsync was called + if (this.runningTaskCancellationTokenSource == null) + { + this.runningTaskCancellationTokenSource = new CancellationTokenSource(); + } + this.sessionReceivePump = new SessionReceivePump( this.ClientId, this.SessionClient, @@ -60,7 +73,8 @@ public void OnSessionHandler( sessionHandlerOptions, callback, this.endpoint, - this.sessionPumpCancellationTokenSource.Token); + this.sessionPumpCancellationTokenSource.Token, + this.runningTaskCancellationTokenSource.Token); } try @@ -82,5 +96,43 @@ public void OnSessionHandler( MessagingEventSource.Log.RegisterOnSessionHandlerStop(this.ClientId); } + + public async Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout) + { + if (inflightSessionHandlerTasksWaitTimeout <= TimeSpan.Zero) + { + throw Fx.Exception.ArgumentOutOfRange(nameof(inflightSessionHandlerTasksWaitTimeout), inflightSessionHandlerTasksWaitTimeout, Resources.TimeoutMustBePositiveNonZero.FormatForUser(nameof(inflightSessionHandlerTasksWaitTimeout), inflightSessionHandlerTasksWaitTimeout)); + } + + MessagingEventSource.Log.UnregisterSessionHandlerStart(this.ClientId); + lock (this.syncLock) + { + if (this.sessionReceivePump == null || this.sessionPumpCancellationTokenSource.IsCancellationRequested) + { + // Silently return if handler has already been unregistered. + return; + } + + this.sessionPumpCancellationTokenSource.Cancel(); + this.sessionPumpCancellationTokenSource.Dispose(); + } + + Stopwatch stopWatch = Stopwatch.StartNew(); + while (this.sessionReceivePump != null + && stopWatch.Elapsed < inflightSessionHandlerTasksWaitTimeout + && (this.sessionReceivePump.maxConcurrentSessionsSemaphoreSlim.CurrentCount < + this.sessionReceivePump.sessionHandlerOptions.MaxConcurrentSessions + || this.sessionReceivePump.maxPendingAcceptSessionsSemaphoreSlim.CurrentCount < + this.sessionReceivePump.sessionHandlerOptions.MaxConcurrentAcceptSessionCalls)) + { + await Task.Delay(10).ConfigureAwait(false); + } + + lock (this.sessionPumpCancellationTokenSource) + { + this.sessionReceivePump = null; + } + MessagingEventSource.Log.UnregisterSessionHandlerStop(this.ClientId); + } } } \ No newline at end of file diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionReceivePump.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionReceivePump.cs index 9a35b3d54d352..695511419885a 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionReceivePump.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionReceivePump.cs @@ -11,15 +11,16 @@ namespace Microsoft.Azure.ServiceBus sealed class SessionReceivePump { + public readonly SemaphoreSlim maxConcurrentSessionsSemaphoreSlim; + public readonly SemaphoreSlim maxPendingAcceptSessionsSemaphoreSlim; + public readonly SessionHandlerOptions sessionHandlerOptions; readonly string clientId; readonly ISessionClient client; readonly Func userOnSessionCallback; - readonly SessionHandlerOptions sessionHandlerOptions; readonly string endpoint; readonly string entityPath; readonly CancellationToken pumpCancellationToken; - readonly SemaphoreSlim maxConcurrentSessionsSemaphoreSlim; - readonly SemaphoreSlim maxPendingAcceptSessionsSemaphoreSlim; + readonly CancellationToken runningTaskCancellationToken; private readonly ServiceBusDiagnosticSource diagnosticSource; public SessionReceivePump(string clientId, @@ -28,7 +29,8 @@ public SessionReceivePump(string clientId, SessionHandlerOptions sessionHandlerOptions, Func callback, Uri endpoint, - CancellationToken token) + CancellationToken pumpToken, + CancellationToken runningTaskToken) { this.client = client ?? throw new ArgumentException(nameof(client)); this.clientId = clientId; @@ -37,7 +39,8 @@ public SessionReceivePump(string clientId, this.userOnSessionCallback = callback; this.endpoint = endpoint.Authority; this.entityPath = client.EntityPath; - this.pumpCancellationToken = token; + this.pumpCancellationToken = pumpToken; + this.runningTaskCancellationToken = runningTaskToken; this.maxConcurrentSessionsSemaphoreSlim = new SemaphoreSlim(this.sessionHandlerOptions.MaxConcurrentSessions); this.maxPendingAcceptSessionsSemaphoreSlim = new SemaphoreSlim(this.sessionHandlerOptions.MaxConcurrentAcceptSessionCalls); this.diagnosticSource = new ServiceBusDiagnosticSource(client.EntityPath, endpoint); @@ -229,7 +232,7 @@ async Task MessagePumpTaskAsync(IMessageSession session) var callbackExceptionOccurred = false; try { - processTask = this.userOnSessionCallback(session, message, this.pumpCancellationToken); + processTask = this.userOnSessionCallback(session, message, this.runningTaskCancellationToken); await processTask.ConfigureAwait(false); } catch (Exception exception) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs index c5d7ebeecfc1a..aa55ec86eb0b7 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs @@ -419,6 +419,18 @@ public void RegisterMessageHandler(Func handle this.InnerSubscriptionClient.InnerReceiver.RegisterMessageHandler(handler, messageHandlerOptions); } + /// + /// Unregister message handler from the receiver if there is an active message handler registered. This operation waits for the completion + /// of inflight receive and message handling operations to finish and unregisters future receives on the message handler which previously + /// registered. + /// is the waitTimeout for inflight message handling tasks. + /// + public async Task UnregisterMessageHandlerAsync(TimeSpan inflightMessageHandlerTasksWaitTimeout) + { + this.ThrowIfClosed(); + await this.InnerSubscriptionClient.InnerReceiver.UnregisterMessageHandlerAsync(inflightMessageHandlerTasksWaitTimeout).ConfigureAwait(false); + } + /// /// Receive session messages continuously from the queue. Registers a message handler and begins a new thread to receive session-messages. /// This handler() is awaited on every time a new message is received by the subscription client. @@ -449,6 +461,18 @@ public void RegisterSessionHandler(Func + /// Unregister session handler from the receiver if there is an active session handler registered. This operation waits for the completion + /// of inflight receive and session handling operations to finish and unregisters future receives on the session handler which previously + /// registered. + /// is the waitTimeout for inflight session handling tasks. + /// + public async Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout) + { + this.ThrowIfClosed(); + await this.SessionPumpHost.UnregisterSessionHandlerAsync(inflightSessionHandlerTasksWaitTimeout).ConfigureAwait(false); + } + /// /// Adds a rule to the current subscription to filter the messages reaching from topic to the subscription. /// @@ -639,4 +663,4 @@ protected override async Task OnClosingAsync() } } } -} \ No newline at end of file +} diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt index 17ab0ce8b6986..f0e3b2ae9e7cf 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt @@ -113,6 +113,7 @@ namespace Microsoft.Azure.ServiceBus string QueueName { get; } void RegisterSessionHandler(System.Func handler, Microsoft.Azure.ServiceBus.SessionHandlerOptions sessionHandlerOptions); void RegisterSessionHandler(System.Func handler, System.Func exceptionReceivedHandler); + System.Threading.Tasks.Task UnregisterSessionHandlerAsync(System.TimeSpan inflightSessionHandlerTasksWaitTimeout); } public interface ISessionClient : Microsoft.Azure.ServiceBus.IClientEntity { @@ -132,6 +133,7 @@ namespace Microsoft.Azure.ServiceBus void RegisterSessionHandler(System.Func handler, Microsoft.Azure.ServiceBus.SessionHandlerOptions sessionHandlerOptions); void RegisterSessionHandler(System.Func handler, System.Func exceptionReceivedHandler); System.Threading.Tasks.Task RemoveRuleAsync(string ruleName); + System.Threading.Tasks.Task UnregisterSessionHandlerAsync(System.TimeSpan inflightSessionHandlerTasksWaitTimeout); } public interface ITopicClient : Microsoft.Azure.ServiceBus.Core.ISenderClient, Microsoft.Azure.ServiceBus.IClientEntity { @@ -241,7 +243,9 @@ namespace Microsoft.Azure.ServiceBus public System.Threading.Tasks.Task ScheduleMessageAsync(Microsoft.Azure.ServiceBus.Message message, System.DateTimeOffset scheduleEnqueueTimeUtc) { } public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Message message) { } public System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IList messageList) { } + public System.Threading.Tasks.Task UnregisterMessageHandlerAsync(System.TimeSpan inflightMessageHandlerTasksWaitTimeout) { } public override void UnregisterPlugin(string serviceBusPluginName) { } + public System.Threading.Tasks.Task UnregisterSessionHandlerAsync(System.TimeSpan inflightSessionHandlerTasksWaitTimeout) { } } public sealed class QuotaExceededException : Microsoft.Azure.ServiceBus.ServiceBusException { @@ -455,7 +459,9 @@ namespace Microsoft.Azure.ServiceBus public void RegisterSessionHandler(System.Func handler, Microsoft.Azure.ServiceBus.SessionHandlerOptions sessionHandlerOptions) { } public void RegisterSessionHandler(System.Func handler, System.Func exceptionReceivedHandler) { } public System.Threading.Tasks.Task RemoveRuleAsync(string ruleName) { } + public System.Threading.Tasks.Task UnregisterMessageHandlerAsync(System.TimeSpan inflightMessageHandlerTasksWaitTimeout) { } public override void UnregisterPlugin(string serviceBusPluginName) { } + public System.Threading.Tasks.Task UnregisterSessionHandlerAsync(System.TimeSpan inflightSessionHandlerTasksWaitTimeout) { } } public class TopicClient : Microsoft.Azure.ServiceBus.ClientEntity, Microsoft.Azure.ServiceBus.Core.ISenderClient, Microsoft.Azure.ServiceBus.IClientEntity, Microsoft.Azure.ServiceBus.ITopicClient { @@ -528,6 +534,7 @@ namespace Microsoft.Azure.ServiceBus.Core System.Threading.Tasks.Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null); void RegisterMessageHandler(System.Func handler, Microsoft.Azure.ServiceBus.MessageHandlerOptions messageHandlerOptions); void RegisterMessageHandler(System.Func handler, System.Func exceptionReceivedHandler); + System.Threading.Tasks.Task UnregisterMessageHandlerAsync(System.TimeSpan inflightMessageHandlerTasksWaitTimeout); } public interface ISenderClient : Microsoft.Azure.ServiceBus.IClientEntity { @@ -580,6 +587,7 @@ namespace Microsoft.Azure.ServiceBus.Core public override void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin) { } public System.Threading.Tasks.Task RenewLockAsync(Microsoft.Azure.ServiceBus.Message message) { } public System.Threading.Tasks.Task RenewLockAsync(string lockToken) { } + public System.Threading.Tasks.Task UnregisterMessageHandlerAsync(System.TimeSpan inflightMessageHandlerTasksWaitTimeout) { } public override void UnregisterPlugin(string serviceBusPluginName) { } } public class MessageSender : Microsoft.Azure.ServiceBus.ClientEntity, Microsoft.Azure.ServiceBus.Core.IMessageSender, Microsoft.Azure.ServiceBus.Core.ISenderClient, Microsoft.Azure.ServiceBus.IClientEntity diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnMessageQueueTests.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnMessageQueueTests.cs index ede0b52db34b8..467f1a76e45f5 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnMessageQueueTests.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnMessageQueueTests.cs @@ -132,6 +132,42 @@ await this.OnMessageAsyncTestCase( await queueClient.CloseAsync(); } }); + + await ServiceBusScope.UsingQueueAsync(partitioned, sessionEnabled, async queueName => + { + var queueClient = new QueueClient(TestUtility.NamespaceConnectionString, queueName, mode); + try + { + await this.OnMessageAsyncUnregisterHandlerLongTimeoutTestCase( + queueClient.InnerSender, + queueClient.InnerReceiver, + maxConcurrentCalls, + autoComplete, + messageCount); + } + finally + { + await queueClient.CloseAsync(); + } + }); + + await ServiceBusScope.UsingQueueAsync(partitioned, sessionEnabled, async queueName => + { + var queueClient = new QueueClient(TestUtility.NamespaceConnectionString, queueName, mode); + try + { + await this.OnMessageAsyncUnregisterHandlerShortTimeoutTestCase( + queueClient.InnerSender, + queueClient.InnerReceiver, + maxConcurrentCalls, + autoComplete, + messageCount); + } + finally + { + await queueClient.CloseAsync(); + } + }); } } } \ No newline at end of file diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnMessageTopicSubscriptionTests.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnMessageTopicSubscriptionTests.cs index 251778bec6b5f..82b5110159e29 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnMessageTopicSubscriptionTests.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnMessageTopicSubscriptionTests.cs @@ -62,6 +62,56 @@ await this.OnMessageAsyncTestCase( await topicClient.CloseAsync(); } }); + + await ServiceBusScope.UsingTopicAsync(partitioned, sessionEnabled, async (topicName, subscriptionName) => + { + var topicClient = new TopicClient(TestUtility.NamespaceConnectionString, topicName); + var subscriptionClient = new SubscriptionClient( + TestUtility.NamespaceConnectionString, + topicName, + subscriptionName, + mode); + + try + { + await this.OnMessageAsyncUnregisterHandlerLongTimeoutTestCase( + topicClient.InnerSender, + subscriptionClient.InnerSubscriptionClient.InnerReceiver, + maxConcurrentCalls, + autoComplete, + messageCount); + } + finally + { + await subscriptionClient.CloseAsync(); + await topicClient.CloseAsync(); + } + }); + + await ServiceBusScope.UsingTopicAsync(partitioned, sessionEnabled, async (topicName, subscriptionName) => + { + var topicClient = new TopicClient(TestUtility.NamespaceConnectionString, topicName); + var subscriptionClient = new SubscriptionClient( + TestUtility.NamespaceConnectionString, + topicName, + subscriptionName, + mode); + + try + { + await this.OnMessageAsyncUnregisterHandlerShortTimeoutTestCase( + topicClient.InnerSender, + subscriptionClient.InnerSubscriptionClient.InnerReceiver, + maxConcurrentCalls, + autoComplete, + messageCount); + } + finally + { + await subscriptionClient.CloseAsync(); + await topicClient.CloseAsync(); + } + }); } } } \ No newline at end of file diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionQueueTests.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionQueueTests.cs index 42da0e7ce4c48..9380a51d426f3 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionQueueTests.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionQueueTests.cs @@ -6,6 +6,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests using System; using System.Collections.Generic; using System.Diagnostics; + using System.Threading; using System.Threading.Tasks; using Xunit; @@ -186,6 +187,114 @@ await ServiceBusScope.UsingQueueAsync(partitioned, sessionEnabled, async queueNa // Verify messages were received. await testSessionHandler.VerifyRun(); + + testSessionHandler.ClearData(); + } + finally + { + await queueClient.CloseAsync(); + } + }); + + // test UnregisterSessionHandler can wait for message handling upto the timeout user defined. + await ServiceBusScope.UsingQueueAsync(partitioned, sessionEnabled, async queueName => + { + TestUtility.Log($"Queue: {queueName}, MaxConcurrentCalls: {maxConcurrentCalls}, Receive Mode: {mode.ToString()}, AutoComplete: {autoComplete}"); + var queueClient = new QueueClient(TestUtility.NamespaceConnectionString, queueName, mode); + try + { + var sessionHandlerOptions = + new SessionHandlerOptions(ExceptionReceivedHandler) + { + MaxConcurrentSessions = maxConcurrentCalls, + MessageWaitTimeout = TimeSpan.FromSeconds(5), + AutoComplete = autoComplete + }; + + var testSessionHandler = new TestSessionHandler( + queueClient.ReceiveMode, + sessionHandlerOptions, + queueClient.InnerSender, + queueClient.SessionPumpHost); + + // Send messages to Session first + await testSessionHandler.SendSessionMessages(); + + // Register handler + var count = 0; + testSessionHandler.RegisterSessionHandler( + async (session, message, token) => + { + await Task.Delay(TimeSpan.FromSeconds(8)); + TestUtility.Log($"Received Session: {session.SessionId} message: SequenceNumber: {message.SystemProperties.SequenceNumber}"); + + if (queueClient.ReceiveMode == ReceiveMode.PeekLock && !sessionHandlerOptions.AutoComplete) + { + await session.CompleteAsync(message.SystemProperties.LockToken); + } + Interlocked.Increment(ref count); + }, + sessionHandlerOptions); + + await Task.Delay(TimeSpan.FromSeconds(2)); + // UnregisterSessionHandler should wait up to the provided timeout to finish the message handling tasks + await testSessionHandler.UnregisterSessionHandler(TimeSpan.FromSeconds(10)); + Assert.True(count == maxConcurrentCalls); + + testSessionHandler.ClearData(); + } + finally + { + await queueClient.CloseAsync(); + } + }); + + // test UnregisterSessionHandler can close down in time when message handling takes longer than wait timeout user defined. + await ServiceBusScope.UsingQueueAsync(partitioned, sessionEnabled, async queueName => + { + TestUtility.Log($"Queue: {queueName}, MaxConcurrentCalls: {maxConcurrentCalls}, Receive Mode: {mode.ToString()}, AutoComplete: {autoComplete}"); + var queueClient = new QueueClient(TestUtility.NamespaceConnectionString, queueName, mode); + try + { + var sessionHandlerOptions = + new SessionHandlerOptions(ExceptionReceivedHandler) + { + MaxConcurrentSessions = maxConcurrentCalls, + MessageWaitTimeout = TimeSpan.FromSeconds(5), + AutoComplete = autoComplete + }; + + var testSessionHandler = new TestSessionHandler( + queueClient.ReceiveMode, + sessionHandlerOptions, + queueClient.InnerSender, + queueClient.SessionPumpHost); + + // Send messages to Session first + await testSessionHandler.SendSessionMessages(); + + // Register handler + var count = 0; + testSessionHandler.RegisterSessionHandler( + async (session, message, token) => + { + await Task.Delay(TimeSpan.FromSeconds(8)); + TestUtility.Log($"Received Session: {session.SessionId} message: SequenceNumber: {message.SystemProperties.SequenceNumber}"); + + if (queueClient.ReceiveMode == ReceiveMode.PeekLock && !sessionHandlerOptions.AutoComplete) + { + await session.CompleteAsync(message.SystemProperties.LockToken); + } + Interlocked.Increment(ref count); + }, + sessionHandlerOptions); + + await Task.Delay(TimeSpan.FromSeconds(2)); + // UnregisterSessionHandler should wait up to the provided timeout to finish the message handling tasks + await testSessionHandler.UnregisterSessionHandler(TimeSpan.FromSeconds(2)); + Assert.True(count == 0); + + testSessionHandler.ClearData(); } finally { diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionTopicSubscriptionTests.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionTopicSubscriptionTests.cs index 161836250b289..a2eab7f9a65c6 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionTopicSubscriptionTests.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionTopicSubscriptionTests.cs @@ -6,6 +6,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests using System; using System.Collections.Generic; using System.Diagnostics; + using System.Threading; using System.Threading.Tasks; using Xunit; @@ -110,7 +111,7 @@ await ServiceBusScope.UsingTopicAsync(partitioned, sessionEnabled, async (topicN var sessionHandlerOptions = new SessionHandlerOptions(ExceptionReceivedHandler) { - MaxConcurrentSessions = 5, + MaxConcurrentSessions = maxConcurrentCalls, MessageWaitTimeout = TimeSpan.FromSeconds(5), AutoComplete = true }; @@ -129,6 +130,128 @@ await ServiceBusScope.UsingTopicAsync(partitioned, sessionEnabled, async (topicN // Verify messages were received. await testSessionHandler.VerifyRun(); + + testSessionHandler.ClearData(); + } + finally + { + await subscriptionClient.CloseAsync(); + await topicClient.CloseAsync(); + } + }); + + // test UnregisterSessionHandler can wait for message handling upto the timeout user defined. + await ServiceBusScope.UsingTopicAsync(partitioned, sessionEnabled, async (topicName, subscriptionName) => + { + TestUtility.Log($"Topic: {topicName}, MaxConcurrentCalls: {maxConcurrentCalls}, Receive Mode: {mode.ToString()}, AutoComplete: {autoComplete}"); + var topicClient = new TopicClient(TestUtility.NamespaceConnectionString, topicName); + var subscriptionClient = new SubscriptionClient( + TestUtility.NamespaceConnectionString, + topicClient.TopicName, + subscriptionName, + ReceiveMode.PeekLock); + + try + { + var sessionHandlerOptions = + new SessionHandlerOptions(ExceptionReceivedHandler) + { + MaxConcurrentSessions = maxConcurrentCalls, + MessageWaitTimeout = TimeSpan.FromSeconds(5), + AutoComplete = true + }; + + var testSessionHandler = new TestSessionHandler( + subscriptionClient.ReceiveMode, + sessionHandlerOptions, + topicClient.InnerSender, + subscriptionClient.SessionPumpHost); + + // Send messages to Session + await testSessionHandler.SendSessionMessages(); + + // Register handler + var count = 0; + testSessionHandler.RegisterSessionHandler( + async (session, message, token) => + { + await Task.Delay(TimeSpan.FromSeconds(8)); + TestUtility.Log($"Received Session: {session.SessionId} message: SequenceNumber: {message.SystemProperties.SequenceNumber}"); + + if (subscriptionClient.ReceiveMode == ReceiveMode.PeekLock && !sessionHandlerOptions.AutoComplete) + { + await session.CompleteAsync(message.SystemProperties.LockToken); + } + Interlocked.Increment(ref count); + }, + sessionHandlerOptions); + + await Task.Delay(TimeSpan.FromSeconds(2)); + // UnregisterSessionHandler should wait up to the provided timeout to finish the message handling tasks + await testSessionHandler.UnregisterSessionHandler(TimeSpan.FromSeconds(10)); + Assert.True(count == maxConcurrentCalls); + + testSessionHandler.ClearData(); + } + finally + { + await subscriptionClient.CloseAsync(); + await topicClient.CloseAsync(); + } + }); + + // test UnregisterSessionHandler can close down in time when message handling takes longer than wait timeout user defined. + await ServiceBusScope.UsingTopicAsync(partitioned, sessionEnabled, async (topicName, subscriptionName) => + { + TestUtility.Log($"Topic: {topicName}, MaxConcurrentCalls: {maxConcurrentCalls}, Receive Mode: {mode.ToString()}, AutoComplete: {autoComplete}"); + var topicClient = new TopicClient(TestUtility.NamespaceConnectionString, topicName); + var subscriptionClient = new SubscriptionClient( + TestUtility.NamespaceConnectionString, + topicClient.TopicName, + subscriptionName, + ReceiveMode.PeekLock); + + try + { + var sessionHandlerOptions = + new SessionHandlerOptions(ExceptionReceivedHandler) + { + MaxConcurrentSessions = maxConcurrentCalls, + MessageWaitTimeout = TimeSpan.FromSeconds(5), + AutoComplete = true + }; + + var testSessionHandler = new TestSessionHandler( + subscriptionClient.ReceiveMode, + sessionHandlerOptions, + topicClient.InnerSender, + subscriptionClient.SessionPumpHost); + + // Send messages to Session + await testSessionHandler.SendSessionMessages(); + + // Register handler + var count = 0; + testSessionHandler.RegisterSessionHandler( + async (session, message, token) => + { + await Task.Delay(TimeSpan.FromSeconds(8)); + TestUtility.Log($"Received Session: {session.SessionId} message: SequenceNumber: {message.SystemProperties.SequenceNumber}"); + + if (subscriptionClient.ReceiveMode == ReceiveMode.PeekLock && !sessionHandlerOptions.AutoComplete) + { + await session.CompleteAsync(message.SystemProperties.LockToken); + } + Interlocked.Increment(ref count); + }, + sessionHandlerOptions); + + await Task.Delay(TimeSpan.FromSeconds(2)); + // UnregisterSessionHandler should wait up to the provided timeout to finish the message handling tasks + await testSessionHandler.UnregisterSessionHandler(TimeSpan.FromSeconds(2)); + Assert.True(count == 0); + + testSessionHandler.ClearData(); } finally { diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/SenderReceiverClientTestBase.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/SenderReceiverClientTestBase.cs index 2430b0d7b18c0..9bd6244d8a9c0 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/SenderReceiverClientTestBase.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/SenderReceiverClientTestBase.cs @@ -253,11 +253,11 @@ internal async Task OnMessageAsyncTestCase( async (message, token) => { TestUtility.Log($"Received message: SequenceNumber: {message.SystemProperties.SequenceNumber}"); - Interlocked.Increment(ref count); if (messageReceiver.ReceiveMode == ReceiveMode.PeekLock && !autoComplete) { await messageReceiver.CompleteAsync(message.SystemProperties.LockToken); } + Interlocked.Increment(ref count); }, new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = maxConcurrentCalls, AutoComplete = autoComplete }); @@ -275,6 +275,62 @@ internal async Task OnMessageAsyncTestCase( Assert.True(count == messageCount); } + internal async Task OnMessageAsyncUnregisterHandlerLongTimeoutTestCase( + IMessageSender messageSender, + IMessageReceiver messageReceiver, + int maxConcurrentCalls, + bool autoComplete, + int messageCount) + { + var count = 0; + await TestUtility.SendMessagesAsync(messageSender, messageCount); + messageReceiver.RegisterMessageHandler( + async (message, token) => + { + TestUtility.Log($"Received message: SequenceNumber: {message.SystemProperties.SequenceNumber}"); + await Task.Delay(TimeSpan.FromSeconds(8)); + if (messageReceiver.ReceiveMode == ReceiveMode.PeekLock && !autoComplete) + { + await messageReceiver.CompleteAsync(message.SystemProperties.LockToken); + } + Interlocked.Increment(ref count); + + }, + new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = maxConcurrentCalls, AutoComplete = autoComplete }); + + await Task.Delay(TimeSpan.FromSeconds(2)); + // UnregisterMessageHandlerAsync should wait up to the provided timeout to finish the message handling tasks + await messageReceiver.UnregisterMessageHandlerAsync(TimeSpan.FromSeconds(20)); + Assert.True(count == maxConcurrentCalls); + } + + internal async Task OnMessageAsyncUnregisterHandlerShortTimeoutTestCase( + IMessageSender messageSender, + IMessageReceiver messageReceiver, + int maxConcurrentCalls, + bool autoComplete, + int messageCount) + { + var count = 0; + await TestUtility.SendMessagesAsync(messageSender, messageCount); + messageReceiver.RegisterMessageHandler( + async (message, token) => + { + TestUtility.Log($"Received message: SequenceNumber: {message.SystemProperties.SequenceNumber}"); + await Task.Delay(TimeSpan.FromSeconds(8)); + if (messageReceiver.ReceiveMode == ReceiveMode.PeekLock && !autoComplete) + { + await messageReceiver.CompleteAsync(message.SystemProperties.LockToken); + } + Interlocked.Increment(ref count); + }, + new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = maxConcurrentCalls, AutoComplete = autoComplete }); + + await Task.Delay(TimeSpan.FromSeconds(2)); + await messageReceiver.UnregisterMessageHandlerAsync(TimeSpan.FromSeconds(2)); + Assert.True(count == 0); + } + internal async Task OnMessageRegistrationWithoutPendingMessagesTestCase( IMessageSender messageSender, IMessageReceiver messageReceiver, diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/TestSessionHandler.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/TestSessionHandler.cs index 99b76a09c8d28..53c251f4ad94f 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/TestSessionHandler.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/TestSessionHandler.cs @@ -37,11 +37,21 @@ public TestSessionHandler( this.sessionMessageMap = new ConcurrentDictionary(); } + public void RegisterSessionHandler(Func handler, SessionHandlerOptions handlerOptions) + { + this.sessionPumpHost.OnSessionHandler(handler, handlerOptions); + } + public void RegisterSessionHandler(SessionHandlerOptions handlerOptions) { this.sessionPumpHost.OnSessionHandler(this.OnSessionHandler, this.sessionHandlerOptions); } + public async Task UnregisterSessionHandler(TimeSpan inflightSessionHandlerTasksWaitTimeout) + { + await this.sessionPumpHost.UnregisterSessionHandlerAsync(inflightSessionHandlerTasksWaitTimeout).ConfigureAwait(false); + } + public async Task SendSessionMessages() { await TestUtility.SendSessionMessagesAsync(this.sender, NumberOfSessions, MessagesPerSession);