diff --git a/Test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs b/Test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs index 14da93396..fb79f6b85 100644 --- a/Test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs +++ b/Test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs @@ -21,11 +21,14 @@ namespace DurableTask.AzureStorage.Tests using System.Threading.Tasks; using DurableTask.AzureStorage.Messaging; using DurableTask.AzureStorage.Monitoring; + using DurableTask.AzureStorage.Partitioning; using DurableTask.AzureStorage.Tracking; using DurableTask.Core; + using DurableTask.Core.History; using Microsoft.VisualStudio.TestTools.UnitTesting; using Microsoft.WindowsAzure.Storage; using Microsoft.WindowsAzure.Storage.Blob; + using Microsoft.WindowsAzure.Storage.Queue; using Microsoft.WindowsAzure.Storage.Table; /// @@ -172,11 +175,11 @@ public async Task> ListBlobsAsync(CloudBlobDirectory client) return results; } -/// -/// REQUIREMENT: Workers can be added or removed at any time and control-queue partitions are load-balanced automatically. -/// REQUIREMENT: No two workers will ever process the same control queue. -/// -[TestMethod] + /// + /// REQUIREMENT: Workers can be added or removed at any time and control-queue partitions are load-balanced automatically. + /// REQUIREMENT: No two workers will ever process the same control queue. + /// + [TestMethod] public async Task MultiWorkerLeaseMovement() { const int MaxWorkerCount = 4; @@ -371,6 +374,76 @@ public async Task TestInstanceAndMessageDistribution() } } + /// + /// If a partition is lost, verify that all pre-fetched messages associated + /// with that partition are abandoned and not processed. + /// + [TestMethod] + public async Task PartitionLost_AbandonPrefetchedSession() + { + var settings = new AzureStorageOrchestrationServiceSettings() + { + PartitionCount = 1, + LeaseRenewInterval = TimeSpan.FromMilliseconds(500), + TaskHubName = TestHelpers.GetTestTaskHubName(), + StorageConnectionString = TestHelpers.GetTestStorageAccountConnectionString(), + ControlQueueBufferThreshold = 100, + }; + + // STEP 1: Start up the service and queue up a large number of messages + var service = new AzureStorageOrchestrationService(settings); + await service.CreateAsync(); + await service.StartAsync(); + + // These instance IDs are set up specifically to bypass message validation logic + // that might otherwise discard these messages as out-of-order, invalid, etc. + var sourceInstance = new OrchestrationInstance(); + var targetInstance = new OrchestrationInstance { InstanceId = "@counter@xyz" }; + + await TestHelpers.WaitFor( + condition: () => service.OwnedControlQueues.Any(), + timeout: TimeSpan.FromSeconds(10)); + ControlQueue controlQueue = service.OwnedControlQueues.Single(); + + List messages = Enumerable.Range(0, 100).Select(i => new TaskMessage + { + Event = new EventRaisedEvent(-1, null), + SequenceNumber = i, + OrchestrationInstance = targetInstance, + }).ToList(); + + await messages.ParallelForEachAsync( + maxConcurrency: 50, + action: msg => controlQueue.AddMessageAsync(msg, sourceInstance)); + + // STEP 2: Force the lease to be stolen and wait for the lease status to update. + // The orchestration service should detect this and update its state. + BlobLease lease = (await service.ListBlobLeasesAsync()).Single(); + await lease.Blob.ChangeLeaseAsync( + proposedLeaseId: Guid.NewGuid().ToString(), + accessCondition: AccessCondition.GenerateLeaseCondition(lease.Token)); + await TestHelpers.WaitFor( + condition: () => !service.OwnedControlQueues.Any(), + timeout: TimeSpan.FromSeconds(10)); + + // Small additional delay to account for tiny race condition between OwnedControlQueues being updated + // and LockNextTaskOrchestrationWorkItemAsync being able to react to that change. + await Task.Delay(250); + + // STEP 3: Try to get an orchestration work item - a null value should be returned + // because the lease was lost. + var workItem = await service.LockNextTaskOrchestrationWorkItemAsync( + TimeSpan.FromMinutes(5), + CancellationToken.None); + Assert.IsNull(workItem); + + // STEP 4: Verify that all the enqueued messages were abandoned, i.e. put back + // onto the queue with their dequeue counts incremented. + IEnumerable queueMessages = + await controlQueue.InnerQueue.PeekMessagesAsync(settings.ControlQueueBatchSize); + Assert.IsTrue(queueMessages.All(msg => msg.DequeueCount == 1)); + } + [TestMethod] public async Task MonitorIdleTaskHubDisconnected() { diff --git a/Test/DurableTask.AzureStorage.Tests/TestHelpers.cs b/Test/DurableTask.AzureStorage.Tests/TestHelpers.cs index d6b13b4c2..92559eb4b 100644 --- a/Test/DurableTask.AzureStorage.Tests/TestHelpers.cs +++ b/Test/DurableTask.AzureStorage.Tests/TestHelpers.cs @@ -15,6 +15,8 @@ namespace DurableTask.AzureStorage.Tests { using System; using System.Configuration; + using System.Diagnostics; + using System.Threading.Tasks; static class TestHelpers { @@ -28,7 +30,7 @@ public static TestOrchestrationHost GetTestOrchestrationHost( var settings = new AzureStorageOrchestrationServiceSettings { StorageConnectionString = storageConnectionString, - TaskHubName = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None).AppSettings.Settings["TaskHubName"].Value, + TaskHubName = GetTestTaskHubName(), ExtendedSessionsEnabled = enableExtendedSessions, ExtendedSessionIdleTimeout = TimeSpan.FromSeconds(extendedSessionTimeoutInSeconds), FetchLargeMessageDataEnabled = fetchLargeMessages, @@ -48,6 +50,12 @@ public static string GetTestStorageAccountConnectionString() return storageConnectionString; } + public static string GetTestTaskHubName() + { + Configuration appConfig = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None); + return appConfig.AppSettings.Settings["TaskHubName"].Value; + } + static string GetTestSetting(string name) { string value = Environment.GetEnvironmentVariable("DurableTaskTest" + name); @@ -58,5 +66,23 @@ static string GetTestSetting(string name) return value; } + + public static async Task WaitFor(Func condition, TimeSpan timeout) + { + Stopwatch timer = Stopwatch.StartNew(); + do + { + bool result = condition(); + if (result) + { + return; + } + + await Task.Delay(TimeSpan.FromMilliseconds(100)); + + } while (timer.Elapsed < timeout); + + throw new TimeoutException("Timed out waiting for condition to be true."); + } } } diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 1c9ce2529..0e1e71fc9 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -610,6 +610,13 @@ public async Task LockNextTaskOrchestrationWorkItemAs return null; } + // Make sure we still own the partition. If not, abandon the session. + if (session.ControlQueue.IsReleased) + { + await this.AbandonAndReleaseSessionAsync(session); + return null; + } + session.StartNewLogicalTraceScope(); List outOfOrderMessages = null; diff --git a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs index 62344bc14..625f46ef5 100644 --- a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs @@ -221,6 +221,10 @@ public override Task DeleteMessageAsync(MessageData message, SessionBase session public void Release() { this.releaseTokenSource.Cancel(); + + // Note that we also set IsReleased to true when the dequeue loop ends, so this is + // somewhat redundant. This one was added mostly to make tests run more predictably. + this.IsReleased = true; } public virtual void Dispose() diff --git a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs index b84345a65..fafae9e24 100644 --- a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs @@ -179,9 +179,9 @@ await this.storageQueue.AddMessageAsync( // We assume that auto-started orchestrations (i.e. instance ids starting with '@') // are used exclusively by durable entities; so we can follow // a custom naming convention to pass a time parameter. - var eventName = eventRaisedEvent.Name; - if (eventName.Length >= 3 && eventName[2] == '@' - && DateTime.TryParse(eventRaisedEvent.Name.Substring(3), out var scheduledTime)) + string eventName = eventRaisedEvent.Name; + if (eventName != null && eventName.Length >= 3 && eventName[2] == '@' + && DateTime.TryParse(eventName.Substring(3), out DateTime scheduledTime)) { initialVisibilityDelay = scheduledTime.ToUniversalTime() - DateTime.UtcNow; if (initialVisibilityDelay < TimeSpan.Zero) diff --git a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs index 2c68f89a3..d56d6bf7b 100644 --- a/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs +++ b/src/DurableTask.AzureStorage/OrchestrationSessionManager.cs @@ -23,7 +23,6 @@ namespace DurableTask.AzureStorage using DurableTask.AzureStorage.Monitoring; using DurableTask.AzureStorage.Tracking; using DurableTask.Core; - using Microsoft.WindowsAzure.Storage; class OrchestrationSessionManager : IDisposable {