Skip to content

Commit

Permalink
Azure Storage: Abandon prefetched orchestrator messages if the lease …
Browse files Browse the repository at this point in the history
…is lost (#360)
  • Loading branch information
cgillum authored Jan 6, 2020
1 parent e213c2c commit f14720a
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 10 deletions.
83 changes: 78 additions & 5 deletions Test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ namespace DurableTask.AzureStorage.Tests
using System.Threading.Tasks;
using DurableTask.AzureStorage.Messaging;
using DurableTask.AzureStorage.Monitoring;
using DurableTask.AzureStorage.Partitioning;
using DurableTask.AzureStorage.Tracking;
using DurableTask.Core;
using DurableTask.Core.History;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Queue;
using Microsoft.WindowsAzure.Storage.Table;

/// <summary>
Expand Down Expand Up @@ -172,11 +175,11 @@ public async Task<List<IListBlobItem>> ListBlobsAsync(CloudBlobDirectory client)
return results;
}

/// <summary>
/// REQUIREMENT: Workers can be added or removed at any time and control-queue partitions are load-balanced automatically.
/// REQUIREMENT: No two workers will ever process the same control queue.
/// </summary>
[TestMethod]
/// <summary>
/// REQUIREMENT: Workers can be added or removed at any time and control-queue partitions are load-balanced automatically.
/// REQUIREMENT: No two workers will ever process the same control queue.
/// </summary>
[TestMethod]
public async Task MultiWorkerLeaseMovement()
{
const int MaxWorkerCount = 4;
Expand Down Expand Up @@ -371,6 +374,76 @@ public async Task TestInstanceAndMessageDistribution()
}
}

/// <summary>
/// If a partition is lost, verify that all pre-fetched messages associated
/// with that partition are abandoned and not processed.
/// </summary>
[TestMethod]
public async Task PartitionLost_AbandonPrefetchedSession()
{
var settings = new AzureStorageOrchestrationServiceSettings()
{
PartitionCount = 1,
LeaseRenewInterval = TimeSpan.FromMilliseconds(500),
TaskHubName = TestHelpers.GetTestTaskHubName(),
StorageConnectionString = TestHelpers.GetTestStorageAccountConnectionString(),
ControlQueueBufferThreshold = 100,
};

// STEP 1: Start up the service and queue up a large number of messages
var service = new AzureStorageOrchestrationService(settings);
await service.CreateAsync();
await service.StartAsync();

// These instance IDs are set up specifically to bypass message validation logic
// that might otherwise discard these messages as out-of-order, invalid, etc.
var sourceInstance = new OrchestrationInstance();
var targetInstance = new OrchestrationInstance { InstanceId = "@counter@xyz" };

await TestHelpers.WaitFor(
condition: () => service.OwnedControlQueues.Any(),
timeout: TimeSpan.FromSeconds(10));
ControlQueue controlQueue = service.OwnedControlQueues.Single();

List<TaskMessage> messages = Enumerable.Range(0, 100).Select(i => new TaskMessage
{
Event = new EventRaisedEvent(-1, null),
SequenceNumber = i,
OrchestrationInstance = targetInstance,
}).ToList();

await messages.ParallelForEachAsync(
maxConcurrency: 50,
action: msg => controlQueue.AddMessageAsync(msg, sourceInstance));

// STEP 2: Force the lease to be stolen and wait for the lease status to update.
// The orchestration service should detect this and update its state.
BlobLease lease = (await service.ListBlobLeasesAsync()).Single();
await lease.Blob.ChangeLeaseAsync(
proposedLeaseId: Guid.NewGuid().ToString(),
accessCondition: AccessCondition.GenerateLeaseCondition(lease.Token));
await TestHelpers.WaitFor(
condition: () => !service.OwnedControlQueues.Any(),
timeout: TimeSpan.FromSeconds(10));

// Small additional delay to account for tiny race condition between OwnedControlQueues being updated
// and LockNextTaskOrchestrationWorkItemAsync being able to react to that change.
await Task.Delay(250);

// STEP 3: Try to get an orchestration work item - a null value should be returned
// because the lease was lost.
var workItem = await service.LockNextTaskOrchestrationWorkItemAsync(
TimeSpan.FromMinutes(5),
CancellationToken.None);
Assert.IsNull(workItem);

// STEP 4: Verify that all the enqueued messages were abandoned, i.e. put back
// onto the queue with their dequeue counts incremented.
IEnumerable<CloudQueueMessage> queueMessages =
await controlQueue.InnerQueue.PeekMessagesAsync(settings.ControlQueueBatchSize);
Assert.IsTrue(queueMessages.All(msg => msg.DequeueCount == 1));
}

[TestMethod]
public async Task MonitorIdleTaskHubDisconnected()
{
Expand Down
28 changes: 27 additions & 1 deletion Test/DurableTask.AzureStorage.Tests/TestHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ namespace DurableTask.AzureStorage.Tests
{
using System;
using System.Configuration;
using System.Diagnostics;
using System.Threading.Tasks;

static class TestHelpers
{
Expand All @@ -28,7 +30,7 @@ public static TestOrchestrationHost GetTestOrchestrationHost(
var settings = new AzureStorageOrchestrationServiceSettings
{
StorageConnectionString = storageConnectionString,
TaskHubName = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None).AppSettings.Settings["TaskHubName"].Value,
TaskHubName = GetTestTaskHubName(),
ExtendedSessionsEnabled = enableExtendedSessions,
ExtendedSessionIdleTimeout = TimeSpan.FromSeconds(extendedSessionTimeoutInSeconds),
FetchLargeMessageDataEnabled = fetchLargeMessages,
Expand All @@ -48,6 +50,12 @@ public static string GetTestStorageAccountConnectionString()
return storageConnectionString;
}

public static string GetTestTaskHubName()
{
Configuration appConfig = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None);
return appConfig.AppSettings.Settings["TaskHubName"].Value;
}

static string GetTestSetting(string name)
{
string value = Environment.GetEnvironmentVariable("DurableTaskTest" + name);
Expand All @@ -58,5 +66,23 @@ static string GetTestSetting(string name)

return value;
}

public static async Task WaitFor(Func<bool> condition, TimeSpan timeout)
{
Stopwatch timer = Stopwatch.StartNew();
do
{
bool result = condition();
if (result)
{
return;
}

await Task.Delay(TimeSpan.FromMilliseconds(100));

} while (timer.Elapsed < timeout);

throw new TimeoutException("Timed out waiting for condition to be true.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,13 @@ public async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAs
return null;
}

// Make sure we still own the partition. If not, abandon the session.
if (session.ControlQueue.IsReleased)
{
await this.AbandonAndReleaseSessionAsync(session);
return null;
}

session.StartNewLogicalTraceScope();

List<MessageData> outOfOrderMessages = null;
Expand Down
4 changes: 4 additions & 0 deletions src/DurableTask.AzureStorage/Messaging/ControlQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ public override Task DeleteMessageAsync(MessageData message, SessionBase session
public void Release()
{
this.releaseTokenSource.Cancel();

// Note that we also set IsReleased to true when the dequeue loop ends, so this is
// somewhat redundant. This one was added mostly to make tests run more predictably.
this.IsReleased = true;
}

public virtual void Dispose()
Expand Down
6 changes: 3 additions & 3 deletions src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,9 @@ await this.storageQueue.AddMessageAsync(
// We assume that auto-started orchestrations (i.e. instance ids starting with '@')
// are used exclusively by durable entities; so we can follow
// a custom naming convention to pass a time parameter.
var eventName = eventRaisedEvent.Name;
if (eventName.Length >= 3 && eventName[2] == '@'
&& DateTime.TryParse(eventRaisedEvent.Name.Substring(3), out var scheduledTime))
string eventName = eventRaisedEvent.Name;
if (eventName != null && eventName.Length >= 3 && eventName[2] == '@'
&& DateTime.TryParse(eventName.Substring(3), out DateTime scheduledTime))
{
initialVisibilityDelay = scheduledTime.ToUniversalTime() - DateTime.UtcNow;
if (initialVisibilityDelay < TimeSpan.Zero)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ namespace DurableTask.AzureStorage
using DurableTask.AzureStorage.Monitoring;
using DurableTask.AzureStorage.Tracking;
using DurableTask.Core;
using Microsoft.WindowsAzure.Storage;

class OrchestrationSessionManager : IDisposable
{
Expand Down

0 comments on commit f14720a

Please sign in to comment.