From 8c46a41ddc62be34272ddd8f5980ca79316d3c62 Mon Sep 17 00:00:00 2001 From: DorothySun216 <55454966+DorothySun216@users.noreply.github.com> Date: Thu, 6 Aug 2020 15:37:21 -0700 Subject: [PATCH 01/22] add UnregisterMessageHandler method --- .../src/Core/IReceiverClient.cs | 5 ++++ .../src/Core/MessageReceiver.cs | 24 +++++++++++++++++++ .../src/MessagingEventSource.cs | 18 ++++++++++++++ .../src/QueueClient.cs | 9 +++++++ .../src/Resources.Designer.cs | 9 +++++++ .../src/Resources.resx | 3 +++ .../src/SubscriptionClient.cs | 9 +++++++ 7 files changed, 77 insertions(+) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs index 7116ce895fba..f5f6df422d40 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs @@ -63,6 +63,11 @@ public interface IReceiverClient : IClientEntity /// Enable prefetch to speed up the receive rate. void RegisterMessageHandler(Func handler, MessageHandlerOptions messageHandlerOptions); + /// + /// Unregister messgae hander from the receiver if there is active message handler registered. + /// + void UnregisterMessageHandler(); + /// /// Completes a using its lock token. This will delete the message from the queue. /// diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs index a55266d092c4..9beec6a1e7b3 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs @@ -899,6 +899,30 @@ public void RegisterMessageHandler(Func handle this.OnMessageHandler(messageHandlerOptions, handler); } + /// + /// Unregister messgae hander from the receiver if there is active message handler registered. + /// + public void UnregisterMessageHandler() + { + this.ThrowIfClosed(); + + MessagingEventSource.Log.UnregisterMessageHandlerStart(this.ClientId); + lock (this.messageReceivePumpSyncLock) + { + if (this.receivePump != null) + { + this.receivePumpCancellationTokenSource.Cancel(); + this.receivePumpCancellationTokenSource.Dispose(); + this.receivePump = null; + } + else + { + throw new InvalidOperationException(Resources.MessageHandlerNotRegisteredYet); + } + } + MessagingEventSource.Log.UnregisterMessageHandlerStop(this.ClientId); + } + /// /// Registers a to be used with this receiver. /// diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/MessagingEventSource.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/MessagingEventSource.cs index b647b1b8b04e..da52ccdd30f7 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/MessagingEventSource.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/MessagingEventSource.cs @@ -1380,6 +1380,24 @@ public void ManagementSerializationException(string objectName, string details = this.WriteEvent(117, objectName, details); } } + + [Event(118, Level = EventLevel.Informational, Message = "{0}: Unregister MessageHandler start.")] + public void UnregisterMessageHandlerStart(string clientId) + { + if (this.IsEnabled()) + { + this.WriteEvent(118, clientId); + } + } + + [Event(119, Level = EventLevel.Informational, Message = "{0}: Unregister MessageHandler done.")] + public void UnregisterMessageHandlerStop(string clientId) + { + if (this.IsEnabled()) + { + this.WriteEvent(119, clientId); + } + } } internal static class TraceHelper diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs index ab199f975976..9ec878abf2fa 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs @@ -445,6 +445,15 @@ public void RegisterMessageHandler(Func handle this.InnerReceiver.RegisterMessageHandler(handler, messageHandlerOptions); } + /// + /// Unregister messgae hander from the receiver if there is active message handler registered. + /// + public void UnregisterMessageHandler() + { + this.ThrowIfClosed(); + this.InnerReceiver.UnregisterMessageHandler(); + } + /// /// Receive session messages continuously from the queue. Registers a message handler and begins a new thread to receive session-messages. /// This handler() is awaited on every time a new message is received by the queue client. diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Resources.Designer.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Resources.Designer.cs index 0795aac701f6..d69b0d385703 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Resources.Designer.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Resources.Designer.cs @@ -231,6 +231,15 @@ internal static string MessageHandlerAlreadyRegistered { } } + /// + /// Looks up a localized string similar to A message handler has not been registered yet.. + /// + internal static string MessageHandlerNotRegisteredYet { + get { + return ResourceManager.GetString("MessageHandlerNotRegisteredYet", resourceCulture); + } + } + /// /// Looks up a localized string similar to The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue, or was received by a different receiver instance.. /// diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Resources.resx b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Resources.resx index c91ba594924e..aa7b26543f45 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Resources.resx +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Resources.resx @@ -192,6 +192,9 @@ A message handler has already been registered. + + A message handler has not been registered yet. + A session handler has already been registered. diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs index c5d7ebeecfc1..474b4d144f40 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs @@ -419,6 +419,15 @@ public void RegisterMessageHandler(Func handle this.InnerSubscriptionClient.InnerReceiver.RegisterMessageHandler(handler, messageHandlerOptions); } + /// + /// Unregister messgae hander from the receiver if there is active message handler registered. + /// + public void UnregisterMessageHandler() + { + this.ThrowIfClosed(); + this.InnerSubscriptionClient.InnerReceiver.UnregisterMessageHandler(); + } + /// /// Receive session messages continuously from the queue. Registers a message handler and begins a new thread to receive session-messages. /// This handler() is awaited on every time a new message is received by the subscription client. From db9b322f58bfe463002ec9389a17fd0758820c5f Mon Sep 17 00:00:00 2001 From: DorothySun216 <55454966+DorothySun216@users.noreply.github.com> Date: Sun, 9 Aug 2020 21:19:28 -0700 Subject: [PATCH 02/22] Update sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs Co-authored-by: Sean Feldman --- .../Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs index f5f6df422d40..ae8031a4ff5e 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs @@ -64,7 +64,7 @@ public interface IReceiverClient : IClientEntity void RegisterMessageHandler(Func handler, MessageHandlerOptions messageHandlerOptions); /// - /// Unregister messgae hander from the receiver if there is active message handler registered. + /// Unregister message handler from the receiver if there is an active message handler registered. /// void UnregisterMessageHandler(); @@ -120,4 +120,4 @@ public interface IReceiverClient : IClientEntity /// Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null); } -} \ No newline at end of file +} From da77a248bfd841de89bd91b81232fe0ab09c1a9a Mon Sep 17 00:00:00 2001 From: DorothySun216 <55454966+DorothySun216@users.noreply.github.com> Date: Sun, 9 Aug 2020 21:19:36 -0700 Subject: [PATCH 03/22] Update sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs Co-authored-by: Sean Feldman --- .../Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs index 9beec6a1e7b3..033bebdfea7b 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs @@ -900,7 +900,7 @@ public void RegisterMessageHandler(Func handle } /// - /// Unregister messgae hander from the receiver if there is active message handler registered. + /// Unregister message hander from the receiver if there is an active message handler registered. /// public void UnregisterMessageHandler() { From 5e1576abcd1237d5c9daaff900e4e1148a51e544 Mon Sep 17 00:00:00 2001 From: DorothySun216 <55454966+DorothySun216@users.noreply.github.com> Date: Wed, 12 Aug 2020 18:52:27 -0700 Subject: [PATCH 04/22] Update the unregister method to be async and await for inflight operations to finish --- .../src/Core/IReceiverClient.cs | 2 +- .../src/Core/MessageReceiver.cs | 26 ++++++++++++------- .../src/MessageReceivePump.cs | 5 ++-- .../src/QueueClient.cs | 4 +-- .../src/Resources.Designer.cs | 9 ------- .../src/SubscriptionClient.cs | 4 +-- 6 files changed, 25 insertions(+), 25 deletions(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs index ae8031a4ff5e..98185d276bcf 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs @@ -66,7 +66,7 @@ public interface IReceiverClient : IClientEntity /// /// Unregister message handler from the receiver if there is an active message handler registered. /// - void UnregisterMessageHandler(); + Task UnregisterMessageHandler(); /// /// Completes a using its lock token. This will delete the message from the queue. diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs index 033bebdfea7b..933110516ad4 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs @@ -902,23 +902,31 @@ public void RegisterMessageHandler(Func handle /// /// Unregister message hander from the receiver if there is an active message handler registered. /// - public void UnregisterMessageHandler() + public async Task UnregisterMessageHandler() { this.ThrowIfClosed(); MessagingEventSource.Log.UnregisterMessageHandlerStart(this.ClientId); lock (this.messageReceivePumpSyncLock) { - if (this.receivePump != null) - { - this.receivePumpCancellationTokenSource.Cancel(); - this.receivePumpCancellationTokenSource.Dispose(); - this.receivePump = null; - } - else + if (this.receivePump == null || this.receivePumpCancellationTokenSource.IsCancellationRequested) { - throw new InvalidOperationException(Resources.MessageHandlerNotRegisteredYet); + return; } + + this.receivePumpCancellationTokenSource.Cancel(); + this.receivePumpCancellationTokenSource.Dispose(); + } + + while (this.receivePump != null + && this.receivePump.maxConcurrentCallsSemaphoreSlim.CurrentCount < this.receivePump.registerHandlerOptions.MaxConcurrentCalls) + { + await Task.Delay(10); + } + + lock (this.messageReceivePumpSyncLock) + { + this.receivePump = null; } MessagingEventSource.Log.UnregisterMessageHandlerStop(this.ClientId); } diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/MessageReceivePump.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/MessageReceivePump.cs index f918b32e146d..c380d5152cc7 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/MessageReceivePump.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/MessageReceivePump.cs @@ -12,14 +12,15 @@ namespace Microsoft.Azure.ServiceBus sealed class MessageReceivePump { + public readonly SemaphoreSlim maxConcurrentCallsSemaphoreSlim; + public readonly MessageHandlerOptions registerHandlerOptions; readonly Func onMessageCallback; readonly string endpoint; - readonly MessageHandlerOptions registerHandlerOptions; readonly IMessageReceiver messageReceiver; readonly CancellationToken pumpCancellationToken; - readonly SemaphoreSlim maxConcurrentCallsSemaphoreSlim; readonly ServiceBusDiagnosticSource diagnosticSource; + public MessageReceivePump(IMessageReceiver messageReceiver, MessageHandlerOptions registerHandlerOptions, Func callback, diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs index 9ec878abf2fa..3bd672de16a0 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs @@ -448,10 +448,10 @@ public void RegisterMessageHandler(Func handle /// /// Unregister messgae hander from the receiver if there is active message handler registered. /// - public void UnregisterMessageHandler() + public async Task UnregisterMessageHandler() { this.ThrowIfClosed(); - this.InnerReceiver.UnregisterMessageHandler(); + await this.InnerReceiver.UnregisterMessageHandler(); } /// diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Resources.Designer.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Resources.Designer.cs index d69b0d385703..0795aac701f6 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Resources.Designer.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Resources.Designer.cs @@ -231,15 +231,6 @@ internal static string MessageHandlerAlreadyRegistered { } } - /// - /// Looks up a localized string similar to A message handler has not been registered yet.. - /// - internal static string MessageHandlerNotRegisteredYet { - get { - return ResourceManager.GetString("MessageHandlerNotRegisteredYet", resourceCulture); - } - } - /// /// Looks up a localized string similar to The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue, or was received by a different receiver instance.. /// diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs index 474b4d144f40..e5e6a8eda75e 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs @@ -422,10 +422,10 @@ public void RegisterMessageHandler(Func handle /// /// Unregister messgae hander from the receiver if there is active message handler registered. /// - public void UnregisterMessageHandler() + public async Task UnregisterMessageHandler() { this.ThrowIfClosed(); - this.InnerSubscriptionClient.InnerReceiver.UnregisterMessageHandler(); + await this.InnerSubscriptionClient.InnerReceiver.UnregisterMessageHandler(); } /// From 61f0998428e24dfc59a5c6f2126568d05ba674ee Mon Sep 17 00:00:00 2001 From: DorothySun216 <55454966+DorothySun216@users.noreply.github.com> Date: Wed, 12 Aug 2020 20:47:22 -0700 Subject: [PATCH 05/22] Update sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs Co-authored-by: Sean Feldman --- .../Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs index e5e6a8eda75e..23149abedcf8 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs @@ -425,7 +425,7 @@ public void RegisterMessageHandler(Func handle public async Task UnregisterMessageHandler() { this.ThrowIfClosed(); - await this.InnerSubscriptionClient.InnerReceiver.UnregisterMessageHandler(); + await this.InnerSubscriptionClient.InnerReceiver.UnregisterMessageHandler().ConfigureAwait(false); } /// @@ -648,4 +648,4 @@ protected override async Task OnClosingAsync() } } } -} \ No newline at end of file +} From 2988d02069417c37c373fee12c5b696d89d36263 Mon Sep 17 00:00:00 2001 From: DorothySun216 <55454966+DorothySun216@users.noreply.github.com> Date: Wed, 12 Aug 2020 20:47:34 -0700 Subject: [PATCH 06/22] Update sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs Co-authored-by: Sean Feldman --- .../Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs index 933110516ad4..fe763b35dacd 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs @@ -921,7 +921,7 @@ public async Task UnregisterMessageHandler() while (this.receivePump != null && this.receivePump.maxConcurrentCallsSemaphoreSlim.CurrentCount < this.receivePump.registerHandlerOptions.MaxConcurrentCalls) { - await Task.Delay(10); + await Task.Delay(10).ConfigureAwait(false); } lock (this.messageReceivePumpSyncLock) From d3faba2c5962709c320e92928878600fbff69a4a Mon Sep 17 00:00:00 2001 From: DorothySun216 <55454966+DorothySun216@users.noreply.github.com> Date: Wed, 12 Aug 2020 20:47:42 -0700 Subject: [PATCH 07/22] Update sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs Co-authored-by: Sean Feldman --- sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs index 3bd672de16a0..c163f38cde51 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs @@ -451,7 +451,7 @@ public void RegisterMessageHandler(Func handle public async Task UnregisterMessageHandler() { this.ThrowIfClosed(); - await this.InnerReceiver.UnregisterMessageHandler(); + await this.InnerReceiver.UnregisterMessageHandler().ConfigureAwait(false); } /// From 4b921a69e1c7c0c962ff2161b44821e35f1c08ec Mon Sep 17 00:00:00 2001 From: DorothySun216 <55454966+DorothySun216@users.noreply.github.com> Date: Thu, 13 Aug 2020 11:06:21 -0700 Subject: [PATCH 08/22] Change name to have async suffix and add to existing onMessageQueueTests --- .../src/Core/IReceiverClient.cs | 2 +- .../src/Core/MessageReceiver.cs | 3 ++- .../Microsoft.Azure.ServiceBus/src/QueueClient.cs | 4 ++-- .../src/SubscriptionClient.cs | 4 ++-- .../Diagnostics/QueueClientDiagnosticsTests.cs | 1 + .../tests/SenderReceiverClientTestBase.cs | 15 +++++++++++++++ 6 files changed, 23 insertions(+), 6 deletions(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs index 98185d276bcf..8628ac35d3a1 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs @@ -66,7 +66,7 @@ public interface IReceiverClient : IClientEntity /// /// Unregister message handler from the receiver if there is an active message handler registered. /// - Task UnregisterMessageHandler(); + Task UnregisterMessageHandlerAsync(); /// /// Completes a using its lock token. This will delete the message from the queue. diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs index fe763b35dacd..d24ca7297874 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs @@ -902,7 +902,7 @@ public void RegisterMessageHandler(Func handle /// /// Unregister message hander from the receiver if there is an active message handler registered. /// - public async Task UnregisterMessageHandler() + public async Task UnregisterMessageHandlerAsync() { this.ThrowIfClosed(); @@ -911,6 +911,7 @@ public async Task UnregisterMessageHandler() { if (this.receivePump == null || this.receivePumpCancellationTokenSource.IsCancellationRequested) { + // Silently return if handler has already been unregistered. return; } diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs index c163f38cde51..224ec3a0ba32 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs @@ -448,10 +448,10 @@ public void RegisterMessageHandler(Func handle /// /// Unregister messgae hander from the receiver if there is active message handler registered. /// - public async Task UnregisterMessageHandler() + public async Task UnregisterMessageHandlerAsync() { this.ThrowIfClosed(); - await this.InnerReceiver.UnregisterMessageHandler().ConfigureAwait(false); + await this.InnerReceiver.UnregisterMessageHandlerAsync().ConfigureAwait(false); } /// diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs index 23149abedcf8..91bc6ace00f7 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs @@ -422,10 +422,10 @@ public void RegisterMessageHandler(Func handle /// /// Unregister messgae hander from the receiver if there is active message handler registered. /// - public async Task UnregisterMessageHandler() + public async Task UnregisterMessageHandlerAsync() { this.ThrowIfClosed(); - await this.InnerSubscriptionClient.InnerReceiver.UnregisterMessageHandler().ConfigureAwait(false); + await this.InnerSubscriptionClient.InnerReceiver.UnregisterMessageHandlerAsync().ConfigureAwait(false); } /// diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/Diagnostics/QueueClientDiagnosticsTests.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/Diagnostics/QueueClientDiagnosticsTests.cs index 616065e577e3..6ddb95bec493 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/Diagnostics/QueueClientDiagnosticsTests.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/Diagnostics/QueueClientDiagnosticsTests.cs @@ -84,6 +84,7 @@ await ServiceBusScope.UsingQueueAsync(partitioned: false, sessionEnabled: false, Assert.Equal(processStop.activity, processActivity); Assert.False(exceptionCalled); + } } finally diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/SenderReceiverClientTestBase.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/SenderReceiverClientTestBase.cs index 2430b0d7b18c..271cb57c65c7 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/SenderReceiverClientTestBase.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/SenderReceiverClientTestBase.cs @@ -273,6 +273,21 @@ internal async Task OnMessageAsyncTestCase( await Task.Delay(TimeSpan.FromSeconds(5)); } Assert.True(count == messageCount); + + count = 0; + await messageReceiver.UnregisterMessageHandlerAsync(); + await TestUtility.SendMessagesAsync(messageSender, messageCount); + stopwatch = Stopwatch.StartNew(); + while (stopwatch.Elapsed.TotalSeconds <= 60) + { + if (count == messageCount) + { + TestUtility.Log($"All '{messageCount}' messages Received."); + break; + } + await Task.Delay(TimeSpan.FromSeconds(5)); + } + Assert.True(count == 0); //<= maxConcurrentCalls); } internal async Task OnMessageRegistrationWithoutPendingMessagesTestCase( From 78aff541b49818497de3d574081b0014068407dd Mon Sep 17 00:00:00 2001 From: DorothySun216 <55454966+DorothySun216@users.noreply.github.com> Date: Thu, 13 Aug 2020 13:05:27 -0700 Subject: [PATCH 09/22] Add UnregisterSessionHandlerAsync and corresponding tests --- .../src/Core/IReceiverClient.cs | 4 ++- .../src/Core/MessageReceiver.cs | 4 ++- .../src/IQueueClient.cs | 7 +++++ .../src/ISubscriptionClient.cs | 7 +++++ .../src/MessagingEventSource.cs | 18 +++++++++++ .../src/QueueClient.cs | 15 +++++++++- .../src/SessionPumpHost.cs | 30 +++++++++++++++++++ .../src/SessionReceivePump.cs | 6 ++-- .../src/SubscriptionClient.cs | 15 +++++++++- .../QueueClientDiagnosticsTests.cs | 1 - .../tests/OnSessionQueueTests.cs | 9 ++++++ .../tests/OnSessionTopicSubscriptionTests.cs | 9 ++++++ .../tests/SenderReceiverClientTestBase.cs | 2 +- .../tests/TestSessionHandler.cs | 23 ++++++++++++++ 14 files changed, 141 insertions(+), 9 deletions(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs index 8628ac35d3a1..6b340bf84601 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs @@ -64,7 +64,9 @@ public interface IReceiverClient : IClientEntity void RegisterMessageHandler(Func handler, MessageHandlerOptions messageHandlerOptions); /// - /// Unregister message handler from the receiver if there is an active message handler registered. + /// Unregister message hander from the receiver if there is an active message handler registered. This operation waits for the completion + /// of inflight receive and message handling operations to finish and unregisters future receives on the message handler which previously + /// registered. /// Task UnregisterMessageHandlerAsync(); diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs index d24ca7297874..1e589c158e6c 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs @@ -900,7 +900,9 @@ public void RegisterMessageHandler(Func handle } /// - /// Unregister message hander from the receiver if there is an active message handler registered. + /// Unregister message hander from the receiver if there is an active message handler registered. This operation waits for the completion + /// of inflight receive and message handling operations to finish and unregisters future receives on the message handler which previously + /// registered. /// public async Task UnregisterMessageHandlerAsync() { diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/IQueueClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/IQueueClient.cs index f45ecbd4f8c1..285ed83b60a7 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/IQueueClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/IQueueClient.cs @@ -77,5 +77,12 @@ public interface IQueueClient : IReceiverClient, ISenderClient /// Options used to configure the settings of the session pump. /// Enable prefetch to speed up the receive rate. void RegisterSessionHandler(Func handler, SessionHandlerOptions sessionHandlerOptions); + + /// + /// Unregister session hander from the receiver if there is an active session handler registered. This operation waits for the completion + /// of inflight receive and session handling operations to finish and unregisters future receives on the session handler which previously + /// registered. + /// + Task UnregisterSessionHandlerAsync(); } } \ No newline at end of file diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/ISubscriptionClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/ISubscriptionClient.cs index 0e12e751e958..2845d5c0012e 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/ISubscriptionClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/ISubscriptionClient.cs @@ -114,5 +114,12 @@ public interface ISubscriptionClient : IReceiverClient /// Options used to configure the settings of the session pump. /// Enable prefetch to speed up the receive rate. void RegisterSessionHandler(Func handler, SessionHandlerOptions sessionHandlerOptions); + + /// + /// Unregister session hander from the receiver if there is an active session handler registered. This operation waits for the completion + /// of inflight receive and session handling operations to finish and unregisters future receives on the session handler which previously + /// registered. + /// + Task UnregisterSessionHandlerAsync(); } } \ No newline at end of file diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/MessagingEventSource.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/MessagingEventSource.cs index da52ccdd30f7..a90e6a942185 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/MessagingEventSource.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/MessagingEventSource.cs @@ -1398,6 +1398,24 @@ public void UnregisterMessageHandlerStop(string clientId) this.WriteEvent(119, clientId); } } + + [Event(120, Level = EventLevel.Informational, Message = "{0}: Unregister SessionHandler start.")] + public void UnregisterSessionHandlerStart(string clientId) + { + if (this.IsEnabled()) + { + this.WriteEvent(120, clientId); + } + } + + [Event(121, Level = EventLevel.Informational, Message = "{0}: Unregister SessionHandler done.")] + public void UnregisterSessionHandlerStop(string clientId) + { + if (this.IsEnabled()) + { + this.WriteEvent(121, clientId); + } + } } internal static class TraceHelper diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs index 224ec3a0ba32..be498b988376 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs @@ -446,7 +446,9 @@ public void RegisterMessageHandler(Func handle } /// - /// Unregister messgae hander from the receiver if there is active message handler registered. + /// Unregister message hander from the receiver if there is an active message handler registered. This operation waits for the completion + /// of inflight receive and message handling operations to finish and unregisters future receives on the message handler which previously + /// registered. /// public async Task UnregisterMessageHandlerAsync() { @@ -484,6 +486,17 @@ public void RegisterSessionHandler(Func + /// Unregister session hander from the receiver if there is an active session handler registered. This operation waits for the completion + /// of inflight receive and session handling operations to finish and unregisters future receives on the session handler which previously + /// registered. + /// + public async Task UnregisterSessionHandlerAsync() + { + this.ThrowIfClosed(); + await this.SessionPumpHost.UnregisterSessionHandlerAsync().ConfigureAwait(false); + } + /// /// Schedules a message to appear on Service Bus at a later time. /// diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionPumpHost.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionPumpHost.cs index b3d6b8ab67c0..b921f713a1a1 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionPumpHost.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionPumpHost.cs @@ -82,5 +82,35 @@ public void OnSessionHandler( MessagingEventSource.Log.RegisterOnSessionHandlerStop(this.ClientId); } + + public async Task UnregisterSessionHandlerAsync() + { + MessagingEventSource.Log.UnregisterSessionHandlerStart(this.ClientId); + lock (this.syncLock) + { + if (this.sessionReceivePump == null || this.sessionPumpCancellationTokenSource.IsCancellationRequested) + { + // Silently return if handler has already been unregistered. + return; + } + + this.sessionPumpCancellationTokenSource.Cancel(); + this.sessionPumpCancellationTokenSource.Dispose(); + } + + while (this.sessionReceivePump != null + && (this.sessionReceivePump.maxConcurrentSessionsSemaphoreSlim.CurrentCount < + this.sessionReceivePump.sessionHandlerOptions.MaxConcurrentSessions + || this.sessionReceivePump.maxPendingAcceptSessionsSemaphoreSlim.CurrentCount < this.sessionReceivePump.sessionHandlerOptions.MaxConcurrentAcceptSessionCalls)) + { + await Task.Delay(10).ConfigureAwait(false); + } + + lock (this.sessionPumpCancellationTokenSource) + { + this.sessionReceivePump = null; + } + MessagingEventSource.Log.UnregisterSessionHandlerStop(this.ClientId); + } } } \ No newline at end of file diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionReceivePump.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionReceivePump.cs index 9a35b3d54d35..f469220dab86 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionReceivePump.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionReceivePump.cs @@ -11,15 +11,15 @@ namespace Microsoft.Azure.ServiceBus sealed class SessionReceivePump { + public readonly SemaphoreSlim maxConcurrentSessionsSemaphoreSlim; + public readonly SemaphoreSlim maxPendingAcceptSessionsSemaphoreSlim; + public readonly SessionHandlerOptions sessionHandlerOptions; readonly string clientId; readonly ISessionClient client; readonly Func userOnSessionCallback; - readonly SessionHandlerOptions sessionHandlerOptions; readonly string endpoint; readonly string entityPath; readonly CancellationToken pumpCancellationToken; - readonly SemaphoreSlim maxConcurrentSessionsSemaphoreSlim; - readonly SemaphoreSlim maxPendingAcceptSessionsSemaphoreSlim; private readonly ServiceBusDiagnosticSource diagnosticSource; public SessionReceivePump(string clientId, diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs index 91bc6ace00f7..bac25a316a21 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs @@ -420,7 +420,9 @@ public void RegisterMessageHandler(Func handle } /// - /// Unregister messgae hander from the receiver if there is active message handler registered. + /// Unregister message hander from the receiver if there is an active message handler registered. This operation waits for the completion + /// of inflight receive and message handling operations to finish and unregisters future receives on the message handler which previously + /// registered. /// public async Task UnregisterMessageHandlerAsync() { @@ -458,6 +460,17 @@ public void RegisterSessionHandler(Func + /// Unregister session hander from the receiver if there is an active session handler registered. This operation waits for the completion + /// of inflight receive and session handling operations to finish and unregisters future receives on the session handler which previously + /// registered. + /// + public async Task UnregisterSessionHandlerAsync() + { + this.ThrowIfClosed(); + await this.SessionPumpHost.UnregisterSessionHandlerAsync().ConfigureAwait(false); + } + /// /// Adds a rule to the current subscription to filter the messages reaching from topic to the subscription. /// diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/Diagnostics/QueueClientDiagnosticsTests.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/Diagnostics/QueueClientDiagnosticsTests.cs index 6ddb95bec493..616065e577e3 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/Diagnostics/QueueClientDiagnosticsTests.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/Diagnostics/QueueClientDiagnosticsTests.cs @@ -84,7 +84,6 @@ await ServiceBusScope.UsingQueueAsync(partitioned: false, sessionEnabled: false, Assert.Equal(processStop.activity, processActivity); Assert.False(exceptionCalled); - } } finally diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionQueueTests.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionQueueTests.cs index 42da0e7ce4c4..5e0ee3582778 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionQueueTests.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionQueueTests.cs @@ -186,6 +186,15 @@ await ServiceBusScope.UsingQueueAsync(partitioned, sessionEnabled, async queueNa // Verify messages were received. await testSessionHandler.VerifyRun(); + + testSessionHandler.ClearData(); + + // Unregister handler + await testSessionHandler.UnregisterSessionHandler(); + + await testSessionHandler.SendSessionMessages(); + + await testSessionHandler.VerifySessionHandlerNotInvokedAndNoMessageReceived(); } finally { diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionTopicSubscriptionTests.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionTopicSubscriptionTests.cs index 161836250b28..74835cea10e1 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionTopicSubscriptionTests.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionTopicSubscriptionTests.cs @@ -129,6 +129,15 @@ await ServiceBusScope.UsingTopicAsync(partitioned, sessionEnabled, async (topicN // Verify messages were received. await testSessionHandler.VerifyRun(); + + testSessionHandler.ClearData(); + + // Unregister handler + await testSessionHandler.UnregisterSessionHandler(); + + await testSessionHandler.SendSessionMessages(); + + await testSessionHandler.VerifySessionHandlerNotInvokedAndNoMessageReceived(); } finally { diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/SenderReceiverClientTestBase.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/SenderReceiverClientTestBase.cs index 271cb57c65c7..a66ff6b73490 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/SenderReceiverClientTestBase.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/SenderReceiverClientTestBase.cs @@ -287,7 +287,7 @@ internal async Task OnMessageAsyncTestCase( } await Task.Delay(TimeSpan.FromSeconds(5)); } - Assert.True(count == 0); //<= maxConcurrentCalls); + Assert.True(count == 0); } internal async Task OnMessageRegistrationWithoutPendingMessagesTestCase( diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/TestSessionHandler.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/TestSessionHandler.cs index 99b76a09c8d2..4453a1854641 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/TestSessionHandler.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/TestSessionHandler.cs @@ -42,6 +42,11 @@ public void RegisterSessionHandler(SessionHandlerOptions handlerOptions) this.sessionPumpHost.OnSessionHandler(this.OnSessionHandler, this.sessionHandlerOptions); } + public async Task UnregisterSessionHandler() + { + await this.sessionPumpHost.UnregisterSessionHandlerAsync().ConfigureAwait(false); + } + public async Task SendSessionMessages() { await TestUtility.SendSessionMessagesAsync(this.sender, NumberOfSessions, MessagesPerSession); @@ -86,6 +91,24 @@ public async Task VerifyRun() Assert.True(this.totalMessageCount == MessagesPerSession * NumberOfSessions); } + public async Task VerifySessionHandlerNotInvokedAndNoMessageReceived() + { + // Wait for the OnMessage Tasks to finish + var stopwatch = Stopwatch.StartNew(); + while (stopwatch.Elapsed.TotalSeconds <= 180) + { + if (this.totalMessageCount == MessagesPerSession * NumberOfSessions) + { + TestUtility.Log($"All '{this.totalMessageCount}' messages Received."); + break; + } + await Task.Delay(TimeSpan.FromSeconds(5)); + } + + Assert.True(this.sessionMessageMap.Keys.Count == 0); + Assert.True(this.totalMessageCount == 0); + } + public void ClearData() { this.totalMessageCount = 0; From 25f6b8d47a14a8291bd2cbbf6ef9c343f7dc0edd Mon Sep 17 00:00:00 2001 From: DorothySun216 <55454966+DorothySun216@users.noreply.github.com> Date: Thu, 13 Aug 2020 15:27:59 -0700 Subject: [PATCH 10/22] nit --- .../Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs | 2 +- .../Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs | 2 +- sdk/servicebus/Microsoft.Azure.ServiceBus/src/IQueueClient.cs | 2 +- .../Microsoft.Azure.ServiceBus/src/ISubscriptionClient.cs | 2 +- sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs | 4 ++-- sdk/servicebus/Microsoft.Azure.ServiceBus/src/Resources.resx | 3 --- .../Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs | 4 ++-- .../Microsoft.Azure.ServiceBus/tests/OnSessionQueueTests.cs | 4 +--- .../tests/OnSessionTopicSubscriptionTests.cs | 4 +--- .../tests/SenderReceiverClientTestBase.cs | 1 + 10 files changed, 11 insertions(+), 17 deletions(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs index 6b340bf84601..783d81febf9a 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs @@ -64,7 +64,7 @@ public interface IReceiverClient : IClientEntity void RegisterMessageHandler(Func handler, MessageHandlerOptions messageHandlerOptions); /// - /// Unregister message hander from the receiver if there is an active message handler registered. This operation waits for the completion + /// Unregister message handler from the receiver if there is an active message handler registered. This operation waits for the completion /// of inflight receive and message handling operations to finish and unregisters future receives on the message handler which previously /// registered. /// diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs index 1e589c158e6c..997c4ef9f336 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs @@ -900,7 +900,7 @@ public void RegisterMessageHandler(Func handle } /// - /// Unregister message hander from the receiver if there is an active message handler registered. This operation waits for the completion + /// Unregister message handler from the receiver if there is an active message handler registered. This operation waits for the completion /// of inflight receive and message handling operations to finish and unregisters future receives on the message handler which previously /// registered. /// diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/IQueueClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/IQueueClient.cs index 285ed83b60a7..27ee46afca29 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/IQueueClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/IQueueClient.cs @@ -79,7 +79,7 @@ public interface IQueueClient : IReceiverClient, ISenderClient void RegisterSessionHandler(Func handler, SessionHandlerOptions sessionHandlerOptions); /// - /// Unregister session hander from the receiver if there is an active session handler registered. This operation waits for the completion + /// Unregister session handler from the receiver if there is an active session handler registered. This operation waits for the completion /// of inflight receive and session handling operations to finish and unregisters future receives on the session handler which previously /// registered. /// diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/ISubscriptionClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/ISubscriptionClient.cs index 2845d5c0012e..d05f6457c800 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/ISubscriptionClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/ISubscriptionClient.cs @@ -116,7 +116,7 @@ public interface ISubscriptionClient : IReceiverClient void RegisterSessionHandler(Func handler, SessionHandlerOptions sessionHandlerOptions); /// - /// Unregister session hander from the receiver if there is an active session handler registered. This operation waits for the completion + /// Unregister session handler from the receiver if there is an active session handler registered. This operation waits for the completion /// of inflight receive and session handling operations to finish and unregisters future receives on the session handler which previously /// registered. /// diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs index be498b988376..8831d9e9dce3 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs @@ -446,7 +446,7 @@ public void RegisterMessageHandler(Func handle } /// - /// Unregister message hander from the receiver if there is an active message handler registered. This operation waits for the completion + /// Unregister message handler from the receiver if there is an active message handler registered. This operation waits for the completion /// of inflight receive and message handling operations to finish and unregisters future receives on the message handler which previously /// registered. /// @@ -487,7 +487,7 @@ public void RegisterSessionHandler(Func - /// Unregister session hander from the receiver if there is an active session handler registered. This operation waits for the completion + /// Unregister session handler from the receiver if there is an active session handler registered. This operation waits for the completion /// of inflight receive and session handling operations to finish and unregisters future receives on the session handler which previously /// registered. /// diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Resources.resx b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Resources.resx index aa7b26543f45..c91ba594924e 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Resources.resx +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Resources.resx @@ -192,9 +192,6 @@ A message handler has already been registered. - - A message handler has not been registered yet. - A session handler has already been registered. diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs index bac25a316a21..581512bbdc7d 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs @@ -420,7 +420,7 @@ public void RegisterMessageHandler(Func handle } /// - /// Unregister message hander from the receiver if there is an active message handler registered. This operation waits for the completion + /// Unregister message handler from the receiver if there is an active message handler registered. This operation waits for the completion /// of inflight receive and message handling operations to finish and unregisters future receives on the message handler which previously /// registered. /// @@ -461,7 +461,7 @@ public void RegisterSessionHandler(Func - /// Unregister session hander from the receiver if there is an active session handler registered. This operation waits for the completion + /// Unregister session handler from the receiver if there is an active session handler registered. This operation waits for the completion /// of inflight receive and session handling operations to finish and unregisters future receives on the session handler which previously /// registered. /// diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionQueueTests.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionQueueTests.cs index 5e0ee3582778..0cced09475b6 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionQueueTests.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionQueueTests.cs @@ -189,11 +189,9 @@ await ServiceBusScope.UsingQueueAsync(partitioned, sessionEnabled, async queueNa testSessionHandler.ClearData(); - // Unregister handler + // Unregister handler to verify that receives should not happen await testSessionHandler.UnregisterSessionHandler(); - await testSessionHandler.SendSessionMessages(); - await testSessionHandler.VerifySessionHandlerNotInvokedAndNoMessageReceived(); } finally diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionTopicSubscriptionTests.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionTopicSubscriptionTests.cs index 74835cea10e1..8d605db5eb2a 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionTopicSubscriptionTests.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionTopicSubscriptionTests.cs @@ -132,11 +132,9 @@ await ServiceBusScope.UsingTopicAsync(partitioned, sessionEnabled, async (topicN testSessionHandler.ClearData(); - // Unregister handler + // Unregister handler to verify that receives should not happen await testSessionHandler.UnregisterSessionHandler(); - await testSessionHandler.SendSessionMessages(); - await testSessionHandler.VerifySessionHandlerNotInvokedAndNoMessageReceived(); } finally diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/SenderReceiverClientTestBase.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/SenderReceiverClientTestBase.cs index a66ff6b73490..ead79b6d5d9f 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/SenderReceiverClientTestBase.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/SenderReceiverClientTestBase.cs @@ -274,6 +274,7 @@ internal async Task OnMessageAsyncTestCase( } Assert.True(count == messageCount); + // Clear count and unregister handler to verify that receives should not happen count = 0; await messageReceiver.UnregisterMessageHandlerAsync(); await TestUtility.SendMessagesAsync(messageSender, messageCount); From d4a55891acc3579d92691cacf60f8a6684d62382 Mon Sep 17 00:00:00 2001 From: DorothySun216 <55454966+DorothySun216@users.noreply.github.com> Date: Thu, 13 Aug 2020 16:39:54 -0700 Subject: [PATCH 11/22] nit --- .../Microsoft.Azure.ServiceBus/src/MessageReceivePump.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/MessageReceivePump.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/MessageReceivePump.cs index c380d5152cc7..239bd530f6ca 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/MessageReceivePump.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/MessageReceivePump.cs @@ -20,7 +20,6 @@ sealed class MessageReceivePump readonly CancellationToken pumpCancellationToken; readonly ServiceBusDiagnosticSource diagnosticSource; - public MessageReceivePump(IMessageReceiver messageReceiver, MessageHandlerOptions registerHandlerOptions, Func callback, From cd8f044facc1ef451f3dfc3240a0c8e2233db461 Mon Sep 17 00:00:00 2001 From: DorothySun216 <55454966+DorothySun216@users.noreply.github.com> Date: Sun, 23 Aug 2020 12:12:15 -0700 Subject: [PATCH 12/22] Add a new cancellation type to not cancel inflight message handling operations when unregister is called. --- .../src/Core/MessageReceiver.cs | 10 +++++++++- .../src/MessageReceivePump.cs | 7 +++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs index 997c4ef9f336..4562afa0f8db 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs @@ -54,7 +54,11 @@ public class MessageReceiver : ClientEntity, IMessageReceiver int prefetchCount; long lastPeekedSequenceNumber; MessageReceivePump receivePump; + // Cancellation token to cancel the message pump. Once this is fired, all future message handling operations registered by application will be + // cancelled. CancellationTokenSource receivePumpCancellationTokenSource; + // Cancellation token to cancel the inflight message handling operations registered by application in the message pump. + CancellationTokenSource runningTaskCancellationTokenSource; /// /// Creates a new MessageReceiver from a . @@ -1038,6 +1042,9 @@ protected override async Task OnClosingAsync() { this.receivePumpCancellationTokenSource.Cancel(); this.receivePumpCancellationTokenSource.Dispose(); + // For back-compatibility + this.runningTaskCancellationTokenSource.Cancel(); + this.runningTaskCancellationTokenSource.Dispose(); this.receivePump = null; } } @@ -1314,7 +1321,8 @@ protected virtual void OnMessageHandler( } this.receivePumpCancellationTokenSource = new CancellationTokenSource(); - this.receivePump = new MessageReceivePump(this, registerHandlerOptions, callback, this.ServiceBusConnection.Endpoint, this.receivePumpCancellationTokenSource.Token); + this.runningTaskCancellationTokenSource = new CancellationTokenSource(); + this.receivePump = new MessageReceivePump(this, registerHandlerOptions, callback, this.ServiceBusConnection.Endpoint, this.receivePumpCancellationTokenSource.Token, this.runningTaskCancellationTokenSource.Token); } try diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/MessageReceivePump.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/MessageReceivePump.cs index 239bd530f6ca..89750fd338c8 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/MessageReceivePump.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/MessageReceivePump.cs @@ -18,19 +18,22 @@ sealed class MessageReceivePump readonly string endpoint; readonly IMessageReceiver messageReceiver; readonly CancellationToken pumpCancellationToken; + readonly CancellationToken runningTaskCancellationToken; readonly ServiceBusDiagnosticSource diagnosticSource; public MessageReceivePump(IMessageReceiver messageReceiver, MessageHandlerOptions registerHandlerOptions, Func callback, Uri endpoint, - CancellationToken pumpCancellationToken) + CancellationToken pumpCancellationToken, + CancellationToken runningTaskCancellationToken) { this.messageReceiver = messageReceiver ?? throw new ArgumentNullException(nameof(messageReceiver)); this.registerHandlerOptions = registerHandlerOptions; this.onMessageCallback = callback; this.endpoint = endpoint.Authority; this.pumpCancellationToken = pumpCancellationToken; + this.runningTaskCancellationToken = runningTaskCancellationToken; this.maxConcurrentCallsSemaphoreSlim = new SemaphoreSlim(this.registerHandlerOptions.MaxConcurrentCalls); this.diagnosticSource = new ServiceBusDiagnosticSource(messageReceiver.Path, endpoint); } @@ -163,7 +166,7 @@ async Task MessageDispatchTask(Message message) try { MessagingEventSource.Log.MessageReceiverPumpUserCallbackStart(this.messageReceiver.ClientId, message); - await this.onMessageCallback(message, this.pumpCancellationToken).ConfigureAwait(false); + await this.onMessageCallback(message, this.runningTaskCancellationToken).ConfigureAwait(false); MessagingEventSource.Log.MessageReceiverPumpUserCallbackStop(this.messageReceiver.ClientId, message); } From fff0016c44f314b4ca62b5e25dbfd45d141e62e6 Mon Sep 17 00:00:00 2001 From: DorothySun216 <55454966+DorothySun216@users.noreply.github.com> Date: Mon, 24 Aug 2020 10:25:16 -0700 Subject: [PATCH 13/22] Add another type of cancellation token to session handler path --- .../src/Core/MessageReceiver.cs | 8 +++++++- .../src/SessionPumpHost.cs | 17 +++++++++++++++-- .../src/SessionReceivePump.cs | 9 ++++++--- 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs index 4562afa0f8db..452367e82e15 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs @@ -1321,7 +1321,13 @@ protected virtual void OnMessageHandler( } this.receivePumpCancellationTokenSource = new CancellationTokenSource(); - this.runningTaskCancellationTokenSource = new CancellationTokenSource(); + + // Running task cancellation token source can be reused if previously UnregisterMessageHandlerAsync was called + if (this.runningTaskCancellationTokenSource == null) + { + this.runningTaskCancellationTokenSource = new CancellationTokenSource(); + } + this.receivePump = new MessageReceivePump(this, registerHandlerOptions, callback, this.ServiceBusConnection.Endpoint, this.receivePumpCancellationTokenSource.Token, this.runningTaskCancellationTokenSource.Token); } diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionPumpHost.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionPumpHost.cs index b921f713a1a1..80c0890b2ba6 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionPumpHost.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionPumpHost.cs @@ -12,6 +12,7 @@ internal sealed class SessionPumpHost readonly object syncLock; SessionReceivePump sessionReceivePump; CancellationTokenSource sessionPumpCancellationTokenSource; + CancellationTokenSource runningTaskCancellationTokenSource; readonly Uri endpoint; public SessionPumpHost(string clientId, ReceiveMode receiveMode, ISessionClient sessionClient, Uri endpoint) @@ -35,6 +36,9 @@ public void Close() { this.sessionPumpCancellationTokenSource?.Cancel(); this.sessionPumpCancellationTokenSource?.Dispose(); + // For back-compatibility + this.runningTaskCancellationTokenSource?.Cancel(); + this.runningTaskCancellationTokenSource?.Dispose(); this.sessionReceivePump = null; } } @@ -53,6 +57,13 @@ public void OnSessionHandler( } this.sessionPumpCancellationTokenSource = new CancellationTokenSource(); + + // Running task cancellation token source can be reused if previously UnregisterSessionHandlerAsync was called + if (this.runningTaskCancellationTokenSource == null) + { + this.runningTaskCancellationTokenSource = new CancellationTokenSource(); + } + this.sessionReceivePump = new SessionReceivePump( this.ClientId, this.SessionClient, @@ -60,7 +71,8 @@ public void OnSessionHandler( sessionHandlerOptions, callback, this.endpoint, - this.sessionPumpCancellationTokenSource.Token); + this.sessionPumpCancellationTokenSource.Token, + this.runningTaskCancellationTokenSource.Token); } try @@ -101,7 +113,8 @@ public async Task UnregisterSessionHandlerAsync() while (this.sessionReceivePump != null && (this.sessionReceivePump.maxConcurrentSessionsSemaphoreSlim.CurrentCount < this.sessionReceivePump.sessionHandlerOptions.MaxConcurrentSessions - || this.sessionReceivePump.maxPendingAcceptSessionsSemaphoreSlim.CurrentCount < this.sessionReceivePump.sessionHandlerOptions.MaxConcurrentAcceptSessionCalls)) + || this.sessionReceivePump.maxPendingAcceptSessionsSemaphoreSlim.CurrentCount < + this.sessionReceivePump.sessionHandlerOptions.MaxConcurrentAcceptSessionCalls)) { await Task.Delay(10).ConfigureAwait(false); } diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionReceivePump.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionReceivePump.cs index f469220dab86..695511419885 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionReceivePump.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionReceivePump.cs @@ -20,6 +20,7 @@ sealed class SessionReceivePump readonly string endpoint; readonly string entityPath; readonly CancellationToken pumpCancellationToken; + readonly CancellationToken runningTaskCancellationToken; private readonly ServiceBusDiagnosticSource diagnosticSource; public SessionReceivePump(string clientId, @@ -28,7 +29,8 @@ public SessionReceivePump(string clientId, SessionHandlerOptions sessionHandlerOptions, Func callback, Uri endpoint, - CancellationToken token) + CancellationToken pumpToken, + CancellationToken runningTaskToken) { this.client = client ?? throw new ArgumentException(nameof(client)); this.clientId = clientId; @@ -37,7 +39,8 @@ public SessionReceivePump(string clientId, this.userOnSessionCallback = callback; this.endpoint = endpoint.Authority; this.entityPath = client.EntityPath; - this.pumpCancellationToken = token; + this.pumpCancellationToken = pumpToken; + this.runningTaskCancellationToken = runningTaskToken; this.maxConcurrentSessionsSemaphoreSlim = new SemaphoreSlim(this.sessionHandlerOptions.MaxConcurrentSessions); this.maxPendingAcceptSessionsSemaphoreSlim = new SemaphoreSlim(this.sessionHandlerOptions.MaxConcurrentAcceptSessionCalls); this.diagnosticSource = new ServiceBusDiagnosticSource(client.EntityPath, endpoint); @@ -229,7 +232,7 @@ async Task MessagePumpTaskAsync(IMessageSession session) var callbackExceptionOccurred = false; try { - processTask = this.userOnSessionCallback(session, message, this.pumpCancellationToken); + processTask = this.userOnSessionCallback(session, message, this.runningTaskCancellationToken); await processTask.ConfigureAwait(false); } catch (Exception exception) From fae7bf149bd0e053568cf698991a632be226e714 Mon Sep 17 00:00:00 2001 From: DorothySun216 <55454966+DorothySun216@users.noreply.github.com> Date: Mon, 24 Aug 2020 10:32:31 -0700 Subject: [PATCH 14/22] nit --- .../Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs index 452367e82e15..0c7e3fcd88bd 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs @@ -1344,6 +1344,8 @@ protected virtual void OnMessageHandler( { this.receivePumpCancellationTokenSource.Cancel(); this.receivePumpCancellationTokenSource.Dispose(); + //this.runningTaskCancellationTokenSource.Cancel(); + //this.runningTaskCancellationTokenSource.Dispose(); this.receivePump = null; } } From 642dc36d06df5e247b3ea7528bcbad7933a68175 Mon Sep 17 00:00:00 2001 From: DorothySun216 <55454966+DorothySun216@users.noreply.github.com> Date: Thu, 3 Sep 2020 17:08:47 -0700 Subject: [PATCH 15/22] Add a timeout parameter to unregister functions and add according unit tests --- .../src/Core/IInnerSubscriptionClient.cs | 1 + .../src/Core/IReceiverClient.cs | 3 +- .../src/Core/MessageReceiver.cs | 10 +- .../src/IQueueClient.cs | 3 +- .../src/ISubscriptionClient.cs | 3 +- .../src/QueueClient.cs | 10 +- .../src/SessionPumpHost.cs | 11 +- .../src/SubscriptionClient.cs | 10 +- .../tests/OnMessageQueueTests.cs | 36 ++++++ .../tests/OnMessageTopicSubscriptionTests.cs | 50 +++++++ .../tests/OnSessionQueueTests.cs | 106 ++++++++++++++- .../tests/OnSessionTopicSubscriptionTests.cs | 122 +++++++++++++++++- .../tests/SenderReceiverClientTestBase.cs | 68 ++++++++-- .../tests/TestSessionHandler.cs | 27 +--- 14 files changed, 406 insertions(+), 54 deletions(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IInnerSubscriptionClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IInnerSubscriptionClient.cs index 0dc3e0d3be6e..8251ae612de2 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IInnerSubscriptionClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IInnerSubscriptionClient.cs @@ -3,6 +3,7 @@ namespace Microsoft.Azure.ServiceBus.Core { + using System; using System.Collections.Generic; using System.Threading.Tasks; diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs index 783d81febf9a..85a374f3b081 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverClient.cs @@ -67,8 +67,9 @@ public interface IReceiverClient : IClientEntity /// Unregister message handler from the receiver if there is an active message handler registered. This operation waits for the completion /// of inflight receive and message handling operations to finish and unregisters future receives on the message handler which previously /// registered. + /// is the waitTimeout for inflight message handling tasks. /// - Task UnregisterMessageHandlerAsync(); + Task UnregisterMessageHandlerAsync(TimeSpan inflightMessageHandlerTasksWaitTimeout); /// /// Completes a using its lock token. This will delete the message from the queue. diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs index 0c7e3fcd88bd..0d7fcea6e203 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs @@ -907,11 +907,17 @@ public void RegisterMessageHandler(Func handle /// Unregister message handler from the receiver if there is an active message handler registered. This operation waits for the completion /// of inflight receive and message handling operations to finish and unregisters future receives on the message handler which previously /// registered. + /// is the waitTimeout for inflight message handling tasks. /// - public async Task UnregisterMessageHandlerAsync() + public async Task UnregisterMessageHandlerAsync(TimeSpan inflightMessageHandlerTasksWaitTimeout) { this.ThrowIfClosed(); + if (inflightMessageHandlerTasksWaitTimeout <= TimeSpan.Zero) + { + throw Fx.Exception.ArgumentOutOfRange(nameof(inflightMessageHandlerTasksWaitTimeout), inflightMessageHandlerTasksWaitTimeout, Resources.TimeoutMustBePositiveNonZero.FormatForUser(nameof(inflightMessageHandlerTasksWaitTimeout), inflightMessageHandlerTasksWaitTimeout)); + } + MessagingEventSource.Log.UnregisterMessageHandlerStart(this.ClientId); lock (this.messageReceivePumpSyncLock) { @@ -925,7 +931,9 @@ public async Task UnregisterMessageHandlerAsync() this.receivePumpCancellationTokenSource.Dispose(); } + Stopwatch stopWatch = Stopwatch.StartNew(); while (this.receivePump != null + && stopWatch.Elapsed < inflightMessageHandlerTasksWaitTimeout && this.receivePump.maxConcurrentCallsSemaphoreSlim.CurrentCount < this.receivePump.registerHandlerOptions.MaxConcurrentCalls) { await Task.Delay(10).ConfigureAwait(false); diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/IQueueClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/IQueueClient.cs index 27ee46afca29..8ce76dcb0c0b 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/IQueueClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/IQueueClient.cs @@ -82,7 +82,8 @@ public interface IQueueClient : IReceiverClient, ISenderClient /// Unregister session handler from the receiver if there is an active session handler registered. This operation waits for the completion /// of inflight receive and session handling operations to finish and unregisters future receives on the session handler which previously /// registered. + /// is the waitTimeout for inflight session handling tasks. /// - Task UnregisterSessionHandlerAsync(); + Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout); } } \ No newline at end of file diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/ISubscriptionClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/ISubscriptionClient.cs index d05f6457c800..e8e19eed3376 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/ISubscriptionClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/ISubscriptionClient.cs @@ -119,7 +119,8 @@ public interface ISubscriptionClient : IReceiverClient /// Unregister session handler from the receiver if there is an active session handler registered. This operation waits for the completion /// of inflight receive and session handling operations to finish and unregisters future receives on the session handler which previously /// registered. + /// is the waitTimeout for inflight session handling tasks. /// - Task UnregisterSessionHandlerAsync(); + Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout); } } \ No newline at end of file diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs index 023e241d6b0a..36c7e6b01bcd 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs @@ -450,11 +450,12 @@ public void RegisterMessageHandler(Func handle /// Unregister message handler from the receiver if there is an active message handler registered. This operation waits for the completion /// of inflight receive and message handling operations to finish and unregisters future receives on the message handler which previously /// registered. + /// is the waitTimeout for inflight message handling tasks. /// - public async Task UnregisterMessageHandlerAsync() + public async Task UnregisterMessageHandlerAsync(TimeSpan inflightMessageHandlerTasksWaitTimeout) { this.ThrowIfClosed(); - await this.InnerReceiver.UnregisterMessageHandlerAsync().ConfigureAwait(false); + await this.InnerReceiver.UnregisterMessageHandlerAsync(inflightMessageHandlerTasksWaitTimeout).ConfigureAwait(false); } /// @@ -491,11 +492,12 @@ public void RegisterSessionHandler(Func is the waitTimeout for inflight session handling tasks. /// - public async Task UnregisterSessionHandlerAsync() + public async Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout) { this.ThrowIfClosed(); - await this.SessionPumpHost.UnregisterSessionHandlerAsync().ConfigureAwait(false); + await this.SessionPumpHost.UnregisterSessionHandlerAsync(inflightSessionHandlerTasksWaitTimeout).ConfigureAwait(false); } /// diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionPumpHost.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionPumpHost.cs index 80c0890b2ba6..6f684f37b6f1 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionPumpHost.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionPumpHost.cs @@ -3,7 +3,9 @@ namespace Microsoft.Azure.ServiceBus { + using Microsoft.Azure.ServiceBus.Primitives; using System; + using System.Diagnostics; using System.Threading; using System.Threading.Tasks; @@ -95,8 +97,13 @@ public void OnSessionHandler( MessagingEventSource.Log.RegisterOnSessionHandlerStop(this.ClientId); } - public async Task UnregisterSessionHandlerAsync() + public async Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout) { + if (inflightSessionHandlerTasksWaitTimeout <= TimeSpan.Zero) + { + throw Fx.Exception.ArgumentOutOfRange(nameof(inflightSessionHandlerTasksWaitTimeout), inflightSessionHandlerTasksWaitTimeout, Resources.TimeoutMustBePositiveNonZero.FormatForUser(nameof(inflightSessionHandlerTasksWaitTimeout), inflightSessionHandlerTasksWaitTimeout)); + } + MessagingEventSource.Log.UnregisterSessionHandlerStart(this.ClientId); lock (this.syncLock) { @@ -110,7 +117,9 @@ public async Task UnregisterSessionHandlerAsync() this.sessionPumpCancellationTokenSource.Dispose(); } + Stopwatch stopWatch = Stopwatch.StartNew(); while (this.sessionReceivePump != null + && stopWatch.Elapsed < inflightSessionHandlerTasksWaitTimeout && (this.sessionReceivePump.maxConcurrentSessionsSemaphoreSlim.CurrentCount < this.sessionReceivePump.sessionHandlerOptions.MaxConcurrentSessions || this.sessionReceivePump.maxPendingAcceptSessionsSemaphoreSlim.CurrentCount < diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs index 581512bbdc7d..aa55ec86eb0b 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClient.cs @@ -423,11 +423,12 @@ public void RegisterMessageHandler(Func handle /// Unregister message handler from the receiver if there is an active message handler registered. This operation waits for the completion /// of inflight receive and message handling operations to finish and unregisters future receives on the message handler which previously /// registered. + /// is the waitTimeout for inflight message handling tasks. /// - public async Task UnregisterMessageHandlerAsync() + public async Task UnregisterMessageHandlerAsync(TimeSpan inflightMessageHandlerTasksWaitTimeout) { this.ThrowIfClosed(); - await this.InnerSubscriptionClient.InnerReceiver.UnregisterMessageHandlerAsync().ConfigureAwait(false); + await this.InnerSubscriptionClient.InnerReceiver.UnregisterMessageHandlerAsync(inflightMessageHandlerTasksWaitTimeout).ConfigureAwait(false); } /// @@ -464,11 +465,12 @@ public void RegisterSessionHandler(Func is the waitTimeout for inflight session handling tasks. /// - public async Task UnregisterSessionHandlerAsync() + public async Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout) { this.ThrowIfClosed(); - await this.SessionPumpHost.UnregisterSessionHandlerAsync().ConfigureAwait(false); + await this.SessionPumpHost.UnregisterSessionHandlerAsync(inflightSessionHandlerTasksWaitTimeout).ConfigureAwait(false); } /// diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnMessageQueueTests.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnMessageQueueTests.cs index ede0b52db34b..467f1a76e45f 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnMessageQueueTests.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnMessageQueueTests.cs @@ -132,6 +132,42 @@ await this.OnMessageAsyncTestCase( await queueClient.CloseAsync(); } }); + + await ServiceBusScope.UsingQueueAsync(partitioned, sessionEnabled, async queueName => + { + var queueClient = new QueueClient(TestUtility.NamespaceConnectionString, queueName, mode); + try + { + await this.OnMessageAsyncUnregisterHandlerLongTimeoutTestCase( + queueClient.InnerSender, + queueClient.InnerReceiver, + maxConcurrentCalls, + autoComplete, + messageCount); + } + finally + { + await queueClient.CloseAsync(); + } + }); + + await ServiceBusScope.UsingQueueAsync(partitioned, sessionEnabled, async queueName => + { + var queueClient = new QueueClient(TestUtility.NamespaceConnectionString, queueName, mode); + try + { + await this.OnMessageAsyncUnregisterHandlerShortTimeoutTestCase( + queueClient.InnerSender, + queueClient.InnerReceiver, + maxConcurrentCalls, + autoComplete, + messageCount); + } + finally + { + await queueClient.CloseAsync(); + } + }); } } } \ No newline at end of file diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnMessageTopicSubscriptionTests.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnMessageTopicSubscriptionTests.cs index 251778bec6b5..82b5110159e2 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnMessageTopicSubscriptionTests.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnMessageTopicSubscriptionTests.cs @@ -62,6 +62,56 @@ await this.OnMessageAsyncTestCase( await topicClient.CloseAsync(); } }); + + await ServiceBusScope.UsingTopicAsync(partitioned, sessionEnabled, async (topicName, subscriptionName) => + { + var topicClient = new TopicClient(TestUtility.NamespaceConnectionString, topicName); + var subscriptionClient = new SubscriptionClient( + TestUtility.NamespaceConnectionString, + topicName, + subscriptionName, + mode); + + try + { + await this.OnMessageAsyncUnregisterHandlerLongTimeoutTestCase( + topicClient.InnerSender, + subscriptionClient.InnerSubscriptionClient.InnerReceiver, + maxConcurrentCalls, + autoComplete, + messageCount); + } + finally + { + await subscriptionClient.CloseAsync(); + await topicClient.CloseAsync(); + } + }); + + await ServiceBusScope.UsingTopicAsync(partitioned, sessionEnabled, async (topicName, subscriptionName) => + { + var topicClient = new TopicClient(TestUtility.NamespaceConnectionString, topicName); + var subscriptionClient = new SubscriptionClient( + TestUtility.NamespaceConnectionString, + topicName, + subscriptionName, + mode); + + try + { + await this.OnMessageAsyncUnregisterHandlerShortTimeoutTestCase( + topicClient.InnerSender, + subscriptionClient.InnerSubscriptionClient.InnerReceiver, + maxConcurrentCalls, + autoComplete, + messageCount); + } + finally + { + await subscriptionClient.CloseAsync(); + await topicClient.CloseAsync(); + } + }); } } } \ No newline at end of file diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionQueueTests.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionQueueTests.cs index 0cced09475b6..90a62d6c4204 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionQueueTests.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionQueueTests.cs @@ -6,6 +6,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests using System; using System.Collections.Generic; using System.Diagnostics; + using System.Threading; using System.Threading.Tasks; using Xunit; @@ -188,11 +189,110 @@ await ServiceBusScope.UsingQueueAsync(partitioned, sessionEnabled, async queueNa await testSessionHandler.VerifyRun(); testSessionHandler.ClearData(); + } + finally + { + await queueClient.CloseAsync(); + } + }); + + await ServiceBusScope.UsingQueueAsync(partitioned, sessionEnabled, async queueName => + { + TestUtility.Log($"Queue: {queueName}, MaxConcurrentCalls: {maxConcurrentCalls}, Receive Mode: {mode.ToString()}, AutoComplete: {autoComplete}"); + var queueClient = new QueueClient(TestUtility.NamespaceConnectionString, queueName, mode); + try + { + var sessionHandlerOptions = + new SessionHandlerOptions(ExceptionReceivedHandler) + { + MaxConcurrentSessions = maxConcurrentCalls, + MessageWaitTimeout = TimeSpan.FromSeconds(5), + AutoComplete = autoComplete + }; + + var testSessionHandler = new TestSessionHandler( + queueClient.ReceiveMode, + sessionHandlerOptions, + queueClient.InnerSender, + queueClient.SessionPumpHost); + + // Send messages to Session first + await testSessionHandler.SendSessionMessages(); + + // Register handler + var count = 0; + testSessionHandler.RegisterSessionHandler( + async (session, message, token) => + { + await Task.Delay(TimeSpan.FromSeconds(8)); + TestUtility.Log($"Received Session: {session.SessionId} message: SequenceNumber: {message.SystemProperties.SequenceNumber}"); + + if (queueClient.ReceiveMode == ReceiveMode.PeekLock && !sessionHandlerOptions.AutoComplete) + { + await session.CompleteAsync(message.SystemProperties.LockToken); + } + Interlocked.Increment(ref count); + }, + sessionHandlerOptions); + + await Task.Delay(TimeSpan.FromSeconds(2)); + // UnregisterSessionHandler should wait up to the provided timeout to finish the message handling tasks + await testSessionHandler.UnregisterSessionHandler(TimeSpan.FromSeconds(10)); + Assert.True(count == maxConcurrentCalls); + + testSessionHandler.ClearData(); + } + finally + { + await queueClient.CloseAsync(); + } + }); - // Unregister handler to verify that receives should not happen - await testSessionHandler.UnregisterSessionHandler(); + await ServiceBusScope.UsingQueueAsync(partitioned, sessionEnabled, async queueName => + { + TestUtility.Log($"Queue: {queueName}, MaxConcurrentCalls: {maxConcurrentCalls}, Receive Mode: {mode.ToString()}, AutoComplete: {autoComplete}"); + var queueClient = new QueueClient(TestUtility.NamespaceConnectionString, queueName, mode); + try + { + var sessionHandlerOptions = + new SessionHandlerOptions(ExceptionReceivedHandler) + { + MaxConcurrentSessions = maxConcurrentCalls, + MessageWaitTimeout = TimeSpan.FromSeconds(5), + AutoComplete = autoComplete + }; + + var testSessionHandler = new TestSessionHandler( + queueClient.ReceiveMode, + sessionHandlerOptions, + queueClient.InnerSender, + queueClient.SessionPumpHost); + + // Send messages to Session first await testSessionHandler.SendSessionMessages(); - await testSessionHandler.VerifySessionHandlerNotInvokedAndNoMessageReceived(); + + // Register handler + var count = 0; + testSessionHandler.RegisterSessionHandler( + async (session, message, token) => + { + await Task.Delay(TimeSpan.FromSeconds(8)); + TestUtility.Log($"Received Session: {session.SessionId} message: SequenceNumber: {message.SystemProperties.SequenceNumber}"); + + if (queueClient.ReceiveMode == ReceiveMode.PeekLock && !sessionHandlerOptions.AutoComplete) + { + await session.CompleteAsync(message.SystemProperties.LockToken); + } + Interlocked.Increment(ref count); + }, + sessionHandlerOptions); + + await Task.Delay(TimeSpan.FromSeconds(2)); + // UnregisterSessionHandler should wait up to the provided timeout to finish the message handling tasks + await testSessionHandler.UnregisterSessionHandler(TimeSpan.FromSeconds(2)); + Assert.True(count == 0); + + testSessionHandler.ClearData(); } finally { diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionTopicSubscriptionTests.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionTopicSubscriptionTests.cs index 8d605db5eb2a..065e6539f67b 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionTopicSubscriptionTests.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionTopicSubscriptionTests.cs @@ -6,6 +6,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests using System; using System.Collections.Generic; using System.Diagnostics; + using System.Threading; using System.Threading.Tasks; using Xunit; @@ -110,7 +111,7 @@ await ServiceBusScope.UsingTopicAsync(partitioned, sessionEnabled, async (topicN var sessionHandlerOptions = new SessionHandlerOptions(ExceptionReceivedHandler) { - MaxConcurrentSessions = 5, + MaxConcurrentSessions = maxConcurrentCalls, MessageWaitTimeout = TimeSpan.FromSeconds(5), AutoComplete = true }; @@ -131,11 +132,124 @@ await ServiceBusScope.UsingTopicAsync(partitioned, sessionEnabled, async (topicN await testSessionHandler.VerifyRun(); testSessionHandler.ClearData(); + } + finally + { + await subscriptionClient.CloseAsync(); + await topicClient.CloseAsync(); + } + }); + + await ServiceBusScope.UsingTopicAsync(partitioned, sessionEnabled, async (topicName, subscriptionName) => + { + TestUtility.Log($"Topic: {topicName}, MaxConcurrentCalls: {maxConcurrentCalls}, Receive Mode: {mode.ToString()}, AutoComplete: {autoComplete}"); + var topicClient = new TopicClient(TestUtility.NamespaceConnectionString, topicName); + var subscriptionClient = new SubscriptionClient( + TestUtility.NamespaceConnectionString, + topicClient.TopicName, + subscriptionName, + ReceiveMode.PeekLock); + + try + { + var sessionHandlerOptions = + new SessionHandlerOptions(ExceptionReceivedHandler) + { + MaxConcurrentSessions = maxConcurrentCalls, + MessageWaitTimeout = TimeSpan.FromSeconds(5), + AutoComplete = true + }; + + var testSessionHandler = new TestSessionHandler( + subscriptionClient.ReceiveMode, + sessionHandlerOptions, + topicClient.InnerSender, + subscriptionClient.SessionPumpHost); + + // Send messages to Session + await testSessionHandler.SendSessionMessages(); + + // Register handler + var count = 0; + testSessionHandler.RegisterSessionHandler( + async (session, message, token) => + { + await Task.Delay(TimeSpan.FromSeconds(8)); + TestUtility.Log($"Received Session: {session.SessionId} message: SequenceNumber: {message.SystemProperties.SequenceNumber}"); + + if (subscriptionClient.ReceiveMode == ReceiveMode.PeekLock && !sessionHandlerOptions.AutoComplete) + { + await session.CompleteAsync(message.SystemProperties.LockToken); + } + Interlocked.Increment(ref count); + }, + sessionHandlerOptions); + + await Task.Delay(TimeSpan.FromSeconds(2)); + // UnregisterSessionHandler should wait up to the provided timeout to finish the message handling tasks + await testSessionHandler.UnregisterSessionHandler(TimeSpan.FromSeconds(10)); + Assert.True(count == maxConcurrentCalls); + + testSessionHandler.ClearData(); + } + finally + { + await subscriptionClient.CloseAsync(); + await topicClient.CloseAsync(); + } + }); + + await ServiceBusScope.UsingTopicAsync(partitioned, sessionEnabled, async (topicName, subscriptionName) => + { + TestUtility.Log($"Topic: {topicName}, MaxConcurrentCalls: {maxConcurrentCalls}, Receive Mode: {mode.ToString()}, AutoComplete: {autoComplete}"); + var topicClient = new TopicClient(TestUtility.NamespaceConnectionString, topicName); + var subscriptionClient = new SubscriptionClient( + TestUtility.NamespaceConnectionString, + topicClient.TopicName, + subscriptionName, + ReceiveMode.PeekLock); + + try + { + var sessionHandlerOptions = + new SessionHandlerOptions(ExceptionReceivedHandler) + { + MaxConcurrentSessions = maxConcurrentCalls, + MessageWaitTimeout = TimeSpan.FromSeconds(5), + AutoComplete = true + }; + + var testSessionHandler = new TestSessionHandler( + subscriptionClient.ReceiveMode, + sessionHandlerOptions, + topicClient.InnerSender, + subscriptionClient.SessionPumpHost); - // Unregister handler to verify that receives should not happen - await testSessionHandler.UnregisterSessionHandler(); + // Send messages to Session await testSessionHandler.SendSessionMessages(); - await testSessionHandler.VerifySessionHandlerNotInvokedAndNoMessageReceived(); + + // Register handler + var count = 0; + testSessionHandler.RegisterSessionHandler( + async (session, message, token) => + { + await Task.Delay(TimeSpan.FromSeconds(8)); + TestUtility.Log($"Received Session: {session.SessionId} message: SequenceNumber: {message.SystemProperties.SequenceNumber}"); + + if (subscriptionClient.ReceiveMode == ReceiveMode.PeekLock && !sessionHandlerOptions.AutoComplete) + { + await session.CompleteAsync(message.SystemProperties.LockToken); + } + Interlocked.Increment(ref count); + }, + sessionHandlerOptions); + + await Task.Delay(TimeSpan.FromSeconds(2)); + // UnregisterSessionHandler should wait up to the provided timeout to finish the message handling tasks + await testSessionHandler.UnregisterSessionHandler(TimeSpan.FromSeconds(2)); + Assert.True(count == 0); + + testSessionHandler.ClearData(); } finally { diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/SenderReceiverClientTestBase.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/SenderReceiverClientTestBase.cs index ead79b6d5d9f..9bd6244d8a9c 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/SenderReceiverClientTestBase.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/SenderReceiverClientTestBase.cs @@ -253,11 +253,11 @@ internal async Task OnMessageAsyncTestCase( async (message, token) => { TestUtility.Log($"Received message: SequenceNumber: {message.SystemProperties.SequenceNumber}"); - Interlocked.Increment(ref count); if (messageReceiver.ReceiveMode == ReceiveMode.PeekLock && !autoComplete) { await messageReceiver.CompleteAsync(message.SystemProperties.LockToken); } + Interlocked.Increment(ref count); }, new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = maxConcurrentCalls, AutoComplete = autoComplete }); @@ -273,22 +273,62 @@ internal async Task OnMessageAsyncTestCase( await Task.Delay(TimeSpan.FromSeconds(5)); } Assert.True(count == messageCount); + } - // Clear count and unregister handler to verify that receives should not happen - count = 0; - await messageReceiver.UnregisterMessageHandlerAsync(); + internal async Task OnMessageAsyncUnregisterHandlerLongTimeoutTestCase( + IMessageSender messageSender, + IMessageReceiver messageReceiver, + int maxConcurrentCalls, + bool autoComplete, + int messageCount) + { + var count = 0; await TestUtility.SendMessagesAsync(messageSender, messageCount); - stopwatch = Stopwatch.StartNew(); - while (stopwatch.Elapsed.TotalSeconds <= 60) - { - if (count == messageCount) + messageReceiver.RegisterMessageHandler( + async (message, token) => { - TestUtility.Log($"All '{messageCount}' messages Received."); - break; - } - await Task.Delay(TimeSpan.FromSeconds(5)); - } - Assert.True(count == 0); + TestUtility.Log($"Received message: SequenceNumber: {message.SystemProperties.SequenceNumber}"); + await Task.Delay(TimeSpan.FromSeconds(8)); + if (messageReceiver.ReceiveMode == ReceiveMode.PeekLock && !autoComplete) + { + await messageReceiver.CompleteAsync(message.SystemProperties.LockToken); + } + Interlocked.Increment(ref count); + + }, + new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = maxConcurrentCalls, AutoComplete = autoComplete }); + + await Task.Delay(TimeSpan.FromSeconds(2)); + // UnregisterMessageHandlerAsync should wait up to the provided timeout to finish the message handling tasks + await messageReceiver.UnregisterMessageHandlerAsync(TimeSpan.FromSeconds(20)); + Assert.True(count == maxConcurrentCalls); + } + + internal async Task OnMessageAsyncUnregisterHandlerShortTimeoutTestCase( + IMessageSender messageSender, + IMessageReceiver messageReceiver, + int maxConcurrentCalls, + bool autoComplete, + int messageCount) + { + var count = 0; + await TestUtility.SendMessagesAsync(messageSender, messageCount); + messageReceiver.RegisterMessageHandler( + async (message, token) => + { + TestUtility.Log($"Received message: SequenceNumber: {message.SystemProperties.SequenceNumber}"); + await Task.Delay(TimeSpan.FromSeconds(8)); + if (messageReceiver.ReceiveMode == ReceiveMode.PeekLock && !autoComplete) + { + await messageReceiver.CompleteAsync(message.SystemProperties.LockToken); + } + Interlocked.Increment(ref count); + }, + new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = maxConcurrentCalls, AutoComplete = autoComplete }); + + await Task.Delay(TimeSpan.FromSeconds(2)); + await messageReceiver.UnregisterMessageHandlerAsync(TimeSpan.FromSeconds(2)); + Assert.True(count == 0); } internal async Task OnMessageRegistrationWithoutPendingMessagesTestCase( diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/TestSessionHandler.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/TestSessionHandler.cs index 4453a1854641..53c251f4ad94 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/TestSessionHandler.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/TestSessionHandler.cs @@ -37,14 +37,19 @@ public TestSessionHandler( this.sessionMessageMap = new ConcurrentDictionary(); } + public void RegisterSessionHandler(Func handler, SessionHandlerOptions handlerOptions) + { + this.sessionPumpHost.OnSessionHandler(handler, handlerOptions); + } + public void RegisterSessionHandler(SessionHandlerOptions handlerOptions) { this.sessionPumpHost.OnSessionHandler(this.OnSessionHandler, this.sessionHandlerOptions); } - public async Task UnregisterSessionHandler() + public async Task UnregisterSessionHandler(TimeSpan inflightSessionHandlerTasksWaitTimeout) { - await this.sessionPumpHost.UnregisterSessionHandlerAsync().ConfigureAwait(false); + await this.sessionPumpHost.UnregisterSessionHandlerAsync(inflightSessionHandlerTasksWaitTimeout).ConfigureAwait(false); } public async Task SendSessionMessages() @@ -91,24 +96,6 @@ public async Task VerifyRun() Assert.True(this.totalMessageCount == MessagesPerSession * NumberOfSessions); } - public async Task VerifySessionHandlerNotInvokedAndNoMessageReceived() - { - // Wait for the OnMessage Tasks to finish - var stopwatch = Stopwatch.StartNew(); - while (stopwatch.Elapsed.TotalSeconds <= 180) - { - if (this.totalMessageCount == MessagesPerSession * NumberOfSessions) - { - TestUtility.Log($"All '{this.totalMessageCount}' messages Received."); - break; - } - await Task.Delay(TimeSpan.FromSeconds(5)); - } - - Assert.True(this.sessionMessageMap.Keys.Count == 0); - Assert.True(this.totalMessageCount == 0); - } - public void ClearData() { this.totalMessageCount = 0; From ffc42498efb7e6065563218961dcbdb936eb5e19 Mon Sep 17 00:00:00 2001 From: DorothySun216 <55454966+DorothySun216@users.noreply.github.com> Date: Thu, 3 Sep 2020 17:19:19 -0700 Subject: [PATCH 16/22] nit --- .../src/Core/IInnerSubscriptionClient.cs | 1 - .../Microsoft.Azure.ServiceBus/tests/OnSessionQueueTests.cs | 2 ++ .../tests/OnSessionTopicSubscriptionTests.cs | 2 ++ 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IInnerSubscriptionClient.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IInnerSubscriptionClient.cs index 8251ae612de2..0dc3e0d3be6e 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IInnerSubscriptionClient.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IInnerSubscriptionClient.cs @@ -3,7 +3,6 @@ namespace Microsoft.Azure.ServiceBus.Core { - using System; using System.Collections.Generic; using System.Threading.Tasks; diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionQueueTests.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionQueueTests.cs index 90a62d6c4204..9380a51d426f 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionQueueTests.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionQueueTests.cs @@ -196,6 +196,7 @@ await ServiceBusScope.UsingQueueAsync(partitioned, sessionEnabled, async queueNa } }); + // test UnregisterSessionHandler can wait for message handling upto the timeout user defined. await ServiceBusScope.UsingQueueAsync(partitioned, sessionEnabled, async queueName => { TestUtility.Log($"Queue: {queueName}, MaxConcurrentCalls: {maxConcurrentCalls}, Receive Mode: {mode.ToString()}, AutoComplete: {autoComplete}"); @@ -248,6 +249,7 @@ await ServiceBusScope.UsingQueueAsync(partitioned, sessionEnabled, async queueNa } }); + // test UnregisterSessionHandler can close down in time when message handling takes longer than wait timeout user defined. await ServiceBusScope.UsingQueueAsync(partitioned, sessionEnabled, async queueName => { TestUtility.Log($"Queue: {queueName}, MaxConcurrentCalls: {maxConcurrentCalls}, Receive Mode: {mode.ToString()}, AutoComplete: {autoComplete}"); diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionTopicSubscriptionTests.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionTopicSubscriptionTests.cs index 065e6539f67b..a2eab7f9a65c 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionTopicSubscriptionTests.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/OnSessionTopicSubscriptionTests.cs @@ -140,6 +140,7 @@ await ServiceBusScope.UsingTopicAsync(partitioned, sessionEnabled, async (topicN } }); + // test UnregisterSessionHandler can wait for message handling upto the timeout user defined. await ServiceBusScope.UsingTopicAsync(partitioned, sessionEnabled, async (topicName, subscriptionName) => { TestUtility.Log($"Topic: {topicName}, MaxConcurrentCalls: {maxConcurrentCalls}, Receive Mode: {mode.ToString()}, AutoComplete: {autoComplete}"); @@ -199,6 +200,7 @@ await ServiceBusScope.UsingTopicAsync(partitioned, sessionEnabled, async (topicN } }); + // test UnregisterSessionHandler can close down in time when message handling takes longer than wait timeout user defined. await ServiceBusScope.UsingTopicAsync(partitioned, sessionEnabled, async (topicName, subscriptionName) => { TestUtility.Log($"Topic: {topicName}, MaxConcurrentCalls: {maxConcurrentCalls}, Receive Mode: {mode.ToString()}, AutoComplete: {autoComplete}"); From fbd31d0594772d8c8459146c32cc91106d55d0b4 Mon Sep 17 00:00:00 2001 From: DorothySun216 <55454966+DorothySun216@users.noreply.github.com> Date: Fri, 4 Sep 2020 16:33:41 -0700 Subject: [PATCH 17/22] cancel runningTaskCancellationTokenSource after unregister is done --- .../Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs index 0d7fcea6e203..52b8e10a0b39 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageReceiver.cs @@ -941,6 +941,8 @@ public async Task UnregisterMessageHandlerAsync(TimeSpan inflightMessageHandlerT lock (this.messageReceivePumpSyncLock) { + this.runningTaskCancellationTokenSource.Cancel(); + this.runningTaskCancellationTokenSource.Dispose(); this.receivePump = null; } MessagingEventSource.Log.UnregisterMessageHandlerStop(this.ClientId); @@ -1330,7 +1332,6 @@ protected virtual void OnMessageHandler( this.receivePumpCancellationTokenSource = new CancellationTokenSource(); - // Running task cancellation token source can be reused if previously UnregisterMessageHandlerAsync was called if (this.runningTaskCancellationTokenSource == null) { this.runningTaskCancellationTokenSource = new CancellationTokenSource(); @@ -1352,8 +1353,8 @@ protected virtual void OnMessageHandler( { this.receivePumpCancellationTokenSource.Cancel(); this.receivePumpCancellationTokenSource.Dispose(); - //this.runningTaskCancellationTokenSource.Cancel(); - //this.runningTaskCancellationTokenSource.Dispose(); + this.runningTaskCancellationTokenSource.Cancel(); + this.runningTaskCancellationTokenSource.Dispose(); this.receivePump = null; } } From f4dac4b2dd6387bf923edb78057a1fc2b807e212 Mon Sep 17 00:00:00 2001 From: DorothySun216 <55454966+DorothySun216@users.noreply.github.com> Date: Wed, 9 Sep 2020 12:04:26 -0700 Subject: [PATCH 18/22] change public API --- .../API/ApiApprovals.ApproveAzureServiceBus.approved.txt | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt index 17ab0ce8b698..7c5b28ccaf77 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt @@ -113,6 +113,7 @@ namespace Microsoft.Azure.ServiceBus string QueueName { get; } void RegisterSessionHandler(System.Func handler, Microsoft.Azure.ServiceBus.SessionHandlerOptions sessionHandlerOptions); void RegisterSessionHandler(System.Func handler, System.Func exceptionReceivedHandler); + Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout); } public interface ISessionClient : Microsoft.Azure.ServiceBus.IClientEntity { @@ -131,6 +132,7 @@ namespace Microsoft.Azure.ServiceBus System.Threading.Tasks.Task> GetRulesAsync(); void RegisterSessionHandler(System.Func handler, Microsoft.Azure.ServiceBus.SessionHandlerOptions sessionHandlerOptions); void RegisterSessionHandler(System.Func handler, System.Func exceptionReceivedHandler); + Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout); System.Threading.Tasks.Task RemoveRuleAsync(string ruleName); } public interface ITopicClient : Microsoft.Azure.ServiceBus.Core.ISenderClient, Microsoft.Azure.ServiceBus.IClientEntity @@ -235,9 +237,11 @@ namespace Microsoft.Azure.ServiceBus protected override System.Threading.Tasks.Task OnClosingAsync() { } public void RegisterMessageHandler(System.Func handler, Microsoft.Azure.ServiceBus.MessageHandlerOptions messageHandlerOptions) { } public void RegisterMessageHandler(System.Func handler, System.Func exceptionReceivedHandler) { } + public async Task UnregisterMessageHandlerAsync(TimeSpan inflightMessageHandlerTasksWaitTimeout) { } public override void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin) { } public void RegisterSessionHandler(System.Func handler, Microsoft.Azure.ServiceBus.SessionHandlerOptions sessionHandlerOptions) { } public void RegisterSessionHandler(System.Func handler, System.Func exceptionReceivedHandler) { } + public async Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout) { } public System.Threading.Tasks.Task ScheduleMessageAsync(Microsoft.Azure.ServiceBus.Message message, System.DateTimeOffset scheduleEnqueueTimeUtc) { } public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Message message) { } public System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IList messageList) { } @@ -451,9 +455,11 @@ namespace Microsoft.Azure.ServiceBus protected override System.Threading.Tasks.Task OnClosingAsync() { } public void RegisterMessageHandler(System.Func handler, Microsoft.Azure.ServiceBus.MessageHandlerOptions messageHandlerOptions) { } public void RegisterMessageHandler(System.Func handler, System.Func exceptionReceivedHandler) { } + public async Task UnregisterMessageHandlerAsync(TimeSpan inflightMessageHandlerTasksWaitTimeout) { } public override void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin) { } public void RegisterSessionHandler(System.Func handler, Microsoft.Azure.ServiceBus.SessionHandlerOptions sessionHandlerOptions) { } public void RegisterSessionHandler(System.Func handler, System.Func exceptionReceivedHandler) { } + public async Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout) { } public System.Threading.Tasks.Task RemoveRuleAsync(string ruleName) { } public override void UnregisterPlugin(string serviceBusPluginName) { } } @@ -528,6 +534,7 @@ namespace Microsoft.Azure.ServiceBus.Core System.Threading.Tasks.Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null); void RegisterMessageHandler(System.Func handler, Microsoft.Azure.ServiceBus.MessageHandlerOptions messageHandlerOptions); void RegisterMessageHandler(System.Func handler, System.Func exceptionReceivedHandler); + Task UnregisterMessageHandlerAsync(TimeSpan inflightMessageHandlerTasksWaitTimeout); } public interface ISenderClient : Microsoft.Azure.ServiceBus.IClientEntity { @@ -577,6 +584,7 @@ namespace Microsoft.Azure.ServiceBus.Core public System.Threading.Tasks.Task ReceiveDeferredMessageAsync(long sequenceNumber) { } public void RegisterMessageHandler(System.Func handler, Microsoft.Azure.ServiceBus.MessageHandlerOptions messageHandlerOptions) { } public void RegisterMessageHandler(System.Func handler, System.Func exceptionReceivedHandler) { } + public async Task UnregisterMessageHandlerAsync(TimeSpan inflightMessageHandlerTasksWaitTimeout) { } public override void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin) { } public System.Threading.Tasks.Task RenewLockAsync(Microsoft.Azure.ServiceBus.Message message) { } public System.Threading.Tasks.Task RenewLockAsync(string lockToken) { } From 96921222681773a54ed46a549a72a838718ab8d5 Mon Sep 17 00:00:00 2001 From: DorothySun216 <55454966+DorothySun216@users.noreply.github.com> Date: Wed, 9 Sep 2020 14:57:09 -0700 Subject: [PATCH 19/22] update the API header --- ...Approvals.ApproveAzureServiceBus.approved.txt | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt index 7c5b28ccaf77..da182e4dd505 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt @@ -113,7 +113,7 @@ namespace Microsoft.Azure.ServiceBus string QueueName { get; } void RegisterSessionHandler(System.Func handler, Microsoft.Azure.ServiceBus.SessionHandlerOptions sessionHandlerOptions); void RegisterSessionHandler(System.Func handler, System.Func exceptionReceivedHandler); - Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout); + Task UnregisterSessionHandlerAsync(System.TimeSpan inflightSessionHandlerTasksWaitTimeout); } public interface ISessionClient : Microsoft.Azure.ServiceBus.IClientEntity { @@ -132,7 +132,7 @@ namespace Microsoft.Azure.ServiceBus System.Threading.Tasks.Task> GetRulesAsync(); void RegisterSessionHandler(System.Func handler, Microsoft.Azure.ServiceBus.SessionHandlerOptions sessionHandlerOptions); void RegisterSessionHandler(System.Func handler, System.Func exceptionReceivedHandler); - Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout); + Task UnregisterSessionHandlerAsync(System.TimeSpan inflightSessionHandlerTasksWaitTimeout); System.Threading.Tasks.Task RemoveRuleAsync(string ruleName); } public interface ITopicClient : Microsoft.Azure.ServiceBus.Core.ISenderClient, Microsoft.Azure.ServiceBus.IClientEntity @@ -237,11 +237,11 @@ namespace Microsoft.Azure.ServiceBus protected override System.Threading.Tasks.Task OnClosingAsync() { } public void RegisterMessageHandler(System.Func handler, Microsoft.Azure.ServiceBus.MessageHandlerOptions messageHandlerOptions) { } public void RegisterMessageHandler(System.Func handler, System.Func exceptionReceivedHandler) { } - public async Task UnregisterMessageHandlerAsync(TimeSpan inflightMessageHandlerTasksWaitTimeout) { } + public async Task UnregisterMessageHandlerAsync(System.TimeSpan inflightMessageHandlerTasksWaitTimeout) { } public override void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin) { } public void RegisterSessionHandler(System.Func handler, Microsoft.Azure.ServiceBus.SessionHandlerOptions sessionHandlerOptions) { } public void RegisterSessionHandler(System.Func handler, System.Func exceptionReceivedHandler) { } - public async Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout) { } + public async Task UnregisterSessionHandlerAsync(System.TimeSpan inflightSessionHandlerTasksWaitTimeout) { } public System.Threading.Tasks.Task ScheduleMessageAsync(Microsoft.Azure.ServiceBus.Message message, System.DateTimeOffset scheduleEnqueueTimeUtc) { } public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Message message) { } public System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IList messageList) { } @@ -455,11 +455,11 @@ namespace Microsoft.Azure.ServiceBus protected override System.Threading.Tasks.Task OnClosingAsync() { } public void RegisterMessageHandler(System.Func handler, Microsoft.Azure.ServiceBus.MessageHandlerOptions messageHandlerOptions) { } public void RegisterMessageHandler(System.Func handler, System.Func exceptionReceivedHandler) { } - public async Task UnregisterMessageHandlerAsync(TimeSpan inflightMessageHandlerTasksWaitTimeout) { } + public async Task UnregisterMessageHandlerAsync(System.TimeSpan inflightMessageHandlerTasksWaitTimeout) { } public override void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin) { } public void RegisterSessionHandler(System.Func handler, Microsoft.Azure.ServiceBus.SessionHandlerOptions sessionHandlerOptions) { } public void RegisterSessionHandler(System.Func handler, System.Func exceptionReceivedHandler) { } - public async Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout) { } + public async Task UnregisterSessionHandlerAsync(System.TimeSpan inflightSessionHandlerTasksWaitTimeout) { } public System.Threading.Tasks.Task RemoveRuleAsync(string ruleName) { } public override void UnregisterPlugin(string serviceBusPluginName) { } } @@ -534,7 +534,7 @@ namespace Microsoft.Azure.ServiceBus.Core System.Threading.Tasks.Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null); void RegisterMessageHandler(System.Func handler, Microsoft.Azure.ServiceBus.MessageHandlerOptions messageHandlerOptions); void RegisterMessageHandler(System.Func handler, System.Func exceptionReceivedHandler); - Task UnregisterMessageHandlerAsync(TimeSpan inflightMessageHandlerTasksWaitTimeout); + Task UnregisterMessageHandlerAsync(System.TimeSpan inflightMessageHandlerTasksWaitTimeout); } public interface ISenderClient : Microsoft.Azure.ServiceBus.IClientEntity { @@ -584,7 +584,7 @@ namespace Microsoft.Azure.ServiceBus.Core public System.Threading.Tasks.Task ReceiveDeferredMessageAsync(long sequenceNumber) { } public void RegisterMessageHandler(System.Func handler, Microsoft.Azure.ServiceBus.MessageHandlerOptions messageHandlerOptions) { } public void RegisterMessageHandler(System.Func handler, System.Func exceptionReceivedHandler) { } - public async Task UnregisterMessageHandlerAsync(TimeSpan inflightMessageHandlerTasksWaitTimeout) { } + public async Task UnregisterMessageHandlerAsync(System.TimeSpan inflightMessageHandlerTasksWaitTimeout) { } public override void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin) { } public System.Threading.Tasks.Task RenewLockAsync(Microsoft.Azure.ServiceBus.Message message) { } public System.Threading.Tasks.Task RenewLockAsync(string lockToken) { } From 7741ad954d29a9694ab34dbf5a40ef1a5d24f7eb Mon Sep 17 00:00:00 2001 From: DorothySun216 <55454966+DorothySun216@users.noreply.github.com> Date: Wed, 9 Sep 2020 17:07:33 -0700 Subject: [PATCH 20/22] update the API definition --- ...Approvals.ApproveAzureServiceBus.approved.txt | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt index da182e4dd505..cc013325085d 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt @@ -113,7 +113,7 @@ namespace Microsoft.Azure.ServiceBus string QueueName { get; } void RegisterSessionHandler(System.Func handler, Microsoft.Azure.ServiceBus.SessionHandlerOptions sessionHandlerOptions); void RegisterSessionHandler(System.Func handler, System.Func exceptionReceivedHandler); - Task UnregisterSessionHandlerAsync(System.TimeSpan inflightSessionHandlerTasksWaitTimeout); + System.Threading.Tasks.Task UnregisterSessionHandlerAsync(System.TimeSpan inflightSessionHandlerTasksWaitTimeout); } public interface ISessionClient : Microsoft.Azure.ServiceBus.IClientEntity { @@ -132,7 +132,7 @@ namespace Microsoft.Azure.ServiceBus System.Threading.Tasks.Task> GetRulesAsync(); void RegisterSessionHandler(System.Func handler, Microsoft.Azure.ServiceBus.SessionHandlerOptions sessionHandlerOptions); void RegisterSessionHandler(System.Func handler, System.Func exceptionReceivedHandler); - Task UnregisterSessionHandlerAsync(System.TimeSpan inflightSessionHandlerTasksWaitTimeout); + System.Threading.Tasks.Task UnregisterSessionHandlerAsync(System.TimeSpan inflightSessionHandlerTasksWaitTimeout); System.Threading.Tasks.Task RemoveRuleAsync(string ruleName); } public interface ITopicClient : Microsoft.Azure.ServiceBus.Core.ISenderClient, Microsoft.Azure.ServiceBus.IClientEntity @@ -237,11 +237,11 @@ namespace Microsoft.Azure.ServiceBus protected override System.Threading.Tasks.Task OnClosingAsync() { } public void RegisterMessageHandler(System.Func handler, Microsoft.Azure.ServiceBus.MessageHandlerOptions messageHandlerOptions) { } public void RegisterMessageHandler(System.Func handler, System.Func exceptionReceivedHandler) { } - public async Task UnregisterMessageHandlerAsync(System.TimeSpan inflightMessageHandlerTasksWaitTimeout) { } + public System.Threading.Tasks.Task UnregisterMessageHandlerAsync(System.TimeSpan inflightMessageHandlerTasksWaitTimeout) { } public override void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin) { } public void RegisterSessionHandler(System.Func handler, Microsoft.Azure.ServiceBus.SessionHandlerOptions sessionHandlerOptions) { } public void RegisterSessionHandler(System.Func handler, System.Func exceptionReceivedHandler) { } - public async Task UnregisterSessionHandlerAsync(System.TimeSpan inflightSessionHandlerTasksWaitTimeout) { } + public System.Threading.Tasks.Task UnregisterSessionHandlerAsync(System.TimeSpan inflightSessionHandlerTasksWaitTimeout) { } public System.Threading.Tasks.Task ScheduleMessageAsync(Microsoft.Azure.ServiceBus.Message message, System.DateTimeOffset scheduleEnqueueTimeUtc) { } public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Message message) { } public System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IList messageList) { } @@ -455,11 +455,11 @@ namespace Microsoft.Azure.ServiceBus protected override System.Threading.Tasks.Task OnClosingAsync() { } public void RegisterMessageHandler(System.Func handler, Microsoft.Azure.ServiceBus.MessageHandlerOptions messageHandlerOptions) { } public void RegisterMessageHandler(System.Func handler, System.Func exceptionReceivedHandler) { } - public async Task UnregisterMessageHandlerAsync(System.TimeSpan inflightMessageHandlerTasksWaitTimeout) { } + public System.Threading.Tasks.Task UnregisterMessageHandlerAsync(System.TimeSpan inflightMessageHandlerTasksWaitTimeout) { } public override void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin) { } public void RegisterSessionHandler(System.Func handler, Microsoft.Azure.ServiceBus.SessionHandlerOptions sessionHandlerOptions) { } public void RegisterSessionHandler(System.Func handler, System.Func exceptionReceivedHandler) { } - public async Task UnregisterSessionHandlerAsync(System.TimeSpan inflightSessionHandlerTasksWaitTimeout) { } + public System.Threading.Tasks.Task UnregisterSessionHandlerAsync(System.TimeSpan inflightSessionHandlerTasksWaitTimeout) { } public System.Threading.Tasks.Task RemoveRuleAsync(string ruleName) { } public override void UnregisterPlugin(string serviceBusPluginName) { } } @@ -534,7 +534,7 @@ namespace Microsoft.Azure.ServiceBus.Core System.Threading.Tasks.Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null); void RegisterMessageHandler(System.Func handler, Microsoft.Azure.ServiceBus.MessageHandlerOptions messageHandlerOptions); void RegisterMessageHandler(System.Func handler, System.Func exceptionReceivedHandler); - Task UnregisterMessageHandlerAsync(System.TimeSpan inflightMessageHandlerTasksWaitTimeout); + System.Threading.Tasks.Task UnregisterMessageHandlerAsync(System.TimeSpan inflightMessageHandlerTasksWaitTimeout); } public interface ISenderClient : Microsoft.Azure.ServiceBus.IClientEntity { @@ -584,7 +584,7 @@ namespace Microsoft.Azure.ServiceBus.Core public System.Threading.Tasks.Task ReceiveDeferredMessageAsync(long sequenceNumber) { } public void RegisterMessageHandler(System.Func handler, Microsoft.Azure.ServiceBus.MessageHandlerOptions messageHandlerOptions) { } public void RegisterMessageHandler(System.Func handler, System.Func exceptionReceivedHandler) { } - public async Task UnregisterMessageHandlerAsync(System.TimeSpan inflightMessageHandlerTasksWaitTimeout) { } + public System.Threading.Tasks.Task UnregisterMessageHandlerAsync(System.TimeSpan inflightMessageHandlerTasksWaitTimeout) { } public override void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin) { } public System.Threading.Tasks.Task RenewLockAsync(Microsoft.Azure.ServiceBus.Message message) { } public System.Threading.Tasks.Task RenewLockAsync(string lockToken) { } From 2bb96a55b654b39f47cdb6738e55a18e6f65418f Mon Sep 17 00:00:00 2001 From: DorothySun216 <55454966+DorothySun216@users.noreply.github.com> Date: Wed, 9 Sep 2020 18:17:18 -0700 Subject: [PATCH 21/22] fix spacing --- .../API/ApiApprovals.ApproveAzureServiceBus.approved.txt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt index cc013325085d..eece841e37a8 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt @@ -113,7 +113,7 @@ namespace Microsoft.Azure.ServiceBus string QueueName { get; } void RegisterSessionHandler(System.Func handler, Microsoft.Azure.ServiceBus.SessionHandlerOptions sessionHandlerOptions); void RegisterSessionHandler(System.Func handler, System.Func exceptionReceivedHandler); - System.Threading.Tasks.Task UnregisterSessionHandlerAsync(System.TimeSpan inflightSessionHandlerTasksWaitTimeout); + System.Threading.Tasks.Task UnregisterSessionHandlerAsync(System.TimeSpan inflightSessionHandlerTasksWaitTimeout); } public interface ISessionClient : Microsoft.Azure.ServiceBus.IClientEntity { @@ -132,7 +132,7 @@ namespace Microsoft.Azure.ServiceBus System.Threading.Tasks.Task> GetRulesAsync(); void RegisterSessionHandler(System.Func handler, Microsoft.Azure.ServiceBus.SessionHandlerOptions sessionHandlerOptions); void RegisterSessionHandler(System.Func handler, System.Func exceptionReceivedHandler); - System.Threading.Tasks.Task UnregisterSessionHandlerAsync(System.TimeSpan inflightSessionHandlerTasksWaitTimeout); + System.Threading.Tasks.Task UnregisterSessionHandlerAsync(System.TimeSpan inflightSessionHandlerTasksWaitTimeout); System.Threading.Tasks.Task RemoveRuleAsync(string ruleName); } public interface ITopicClient : Microsoft.Azure.ServiceBus.Core.ISenderClient, Microsoft.Azure.ServiceBus.IClientEntity @@ -534,7 +534,7 @@ namespace Microsoft.Azure.ServiceBus.Core System.Threading.Tasks.Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null); void RegisterMessageHandler(System.Func handler, Microsoft.Azure.ServiceBus.MessageHandlerOptions messageHandlerOptions); void RegisterMessageHandler(System.Func handler, System.Func exceptionReceivedHandler); - System.Threading.Tasks.Task UnregisterMessageHandlerAsync(System.TimeSpan inflightMessageHandlerTasksWaitTimeout); + System.Threading.Tasks.Task UnregisterMessageHandlerAsync(System.TimeSpan inflightMessageHandlerTasksWaitTimeout); } public interface ISenderClient : Microsoft.Azure.ServiceBus.IClientEntity { From c0ec74a76aee5bea8614c412bef3cf9e12b4641e Mon Sep 17 00:00:00 2001 From: DorothySun216 <55454966+DorothySun216@users.noreply.github.com> Date: Wed, 9 Sep 2020 19:03:30 -0700 Subject: [PATCH 22/22] fix ApproveAzureServiceBus CIT test --- .../ApiApprovals.ApproveAzureServiceBus.approved.txt | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt index eece841e37a8..f0e3b2ae9e7c 100644 --- a/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt +++ b/sdk/servicebus/Microsoft.Azure.ServiceBus/tests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt @@ -132,8 +132,8 @@ namespace Microsoft.Azure.ServiceBus System.Threading.Tasks.Task> GetRulesAsync(); void RegisterSessionHandler(System.Func handler, Microsoft.Azure.ServiceBus.SessionHandlerOptions sessionHandlerOptions); void RegisterSessionHandler(System.Func handler, System.Func exceptionReceivedHandler); - System.Threading.Tasks.Task UnregisterSessionHandlerAsync(System.TimeSpan inflightSessionHandlerTasksWaitTimeout); System.Threading.Tasks.Task RemoveRuleAsync(string ruleName); + System.Threading.Tasks.Task UnregisterSessionHandlerAsync(System.TimeSpan inflightSessionHandlerTasksWaitTimeout); } public interface ITopicClient : Microsoft.Azure.ServiceBus.Core.ISenderClient, Microsoft.Azure.ServiceBus.IClientEntity { @@ -237,15 +237,15 @@ namespace Microsoft.Azure.ServiceBus protected override System.Threading.Tasks.Task OnClosingAsync() { } public void RegisterMessageHandler(System.Func handler, Microsoft.Azure.ServiceBus.MessageHandlerOptions messageHandlerOptions) { } public void RegisterMessageHandler(System.Func handler, System.Func exceptionReceivedHandler) { } - public System.Threading.Tasks.Task UnregisterMessageHandlerAsync(System.TimeSpan inflightMessageHandlerTasksWaitTimeout) { } public override void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin) { } public void RegisterSessionHandler(System.Func handler, Microsoft.Azure.ServiceBus.SessionHandlerOptions sessionHandlerOptions) { } public void RegisterSessionHandler(System.Func handler, System.Func exceptionReceivedHandler) { } - public System.Threading.Tasks.Task UnregisterSessionHandlerAsync(System.TimeSpan inflightSessionHandlerTasksWaitTimeout) { } public System.Threading.Tasks.Task ScheduleMessageAsync(Microsoft.Azure.ServiceBus.Message message, System.DateTimeOffset scheduleEnqueueTimeUtc) { } public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Message message) { } public System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IList messageList) { } + public System.Threading.Tasks.Task UnregisterMessageHandlerAsync(System.TimeSpan inflightMessageHandlerTasksWaitTimeout) { } public override void UnregisterPlugin(string serviceBusPluginName) { } + public System.Threading.Tasks.Task UnregisterSessionHandlerAsync(System.TimeSpan inflightSessionHandlerTasksWaitTimeout) { } } public sealed class QuotaExceededException : Microsoft.Azure.ServiceBus.ServiceBusException { @@ -455,13 +455,13 @@ namespace Microsoft.Azure.ServiceBus protected override System.Threading.Tasks.Task OnClosingAsync() { } public void RegisterMessageHandler(System.Func handler, Microsoft.Azure.ServiceBus.MessageHandlerOptions messageHandlerOptions) { } public void RegisterMessageHandler(System.Func handler, System.Func exceptionReceivedHandler) { } - public System.Threading.Tasks.Task UnregisterMessageHandlerAsync(System.TimeSpan inflightMessageHandlerTasksWaitTimeout) { } public override void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin) { } public void RegisterSessionHandler(System.Func handler, Microsoft.Azure.ServiceBus.SessionHandlerOptions sessionHandlerOptions) { } public void RegisterSessionHandler(System.Func handler, System.Func exceptionReceivedHandler) { } - public System.Threading.Tasks.Task UnregisterSessionHandlerAsync(System.TimeSpan inflightSessionHandlerTasksWaitTimeout) { } public System.Threading.Tasks.Task RemoveRuleAsync(string ruleName) { } + public System.Threading.Tasks.Task UnregisterMessageHandlerAsync(System.TimeSpan inflightMessageHandlerTasksWaitTimeout) { } public override void UnregisterPlugin(string serviceBusPluginName) { } + public System.Threading.Tasks.Task UnregisterSessionHandlerAsync(System.TimeSpan inflightSessionHandlerTasksWaitTimeout) { } } public class TopicClient : Microsoft.Azure.ServiceBus.ClientEntity, Microsoft.Azure.ServiceBus.Core.ISenderClient, Microsoft.Azure.ServiceBus.IClientEntity, Microsoft.Azure.ServiceBus.ITopicClient { @@ -584,10 +584,10 @@ namespace Microsoft.Azure.ServiceBus.Core public System.Threading.Tasks.Task ReceiveDeferredMessageAsync(long sequenceNumber) { } public void RegisterMessageHandler(System.Func handler, Microsoft.Azure.ServiceBus.MessageHandlerOptions messageHandlerOptions) { } public void RegisterMessageHandler(System.Func handler, System.Func exceptionReceivedHandler) { } - public System.Threading.Tasks.Task UnregisterMessageHandlerAsync(System.TimeSpan inflightMessageHandlerTasksWaitTimeout) { } public override void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin) { } public System.Threading.Tasks.Task RenewLockAsync(Microsoft.Azure.ServiceBus.Message message) { } public System.Threading.Tasks.Task RenewLockAsync(string lockToken) { } + public System.Threading.Tasks.Task UnregisterMessageHandlerAsync(System.TimeSpan inflightMessageHandlerTasksWaitTimeout) { } public override void UnregisterPlugin(string serviceBusPluginName) { } } public class MessageSender : Microsoft.Azure.ServiceBus.ClientEntity, Microsoft.Azure.ServiceBus.Core.IMessageSender, Microsoft.Azure.ServiceBus.Core.ISenderClient, Microsoft.Azure.ServiceBus.IClientEntity