Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Azure Storage: Abandon prefetched orchestrator messages if the lease is lost #360

Merged
merged 2 commits into from
Jan 6, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 77 additions & 5 deletions Test/DurableTask.AzureStorage.Tests/AzureStorageScaleTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ namespace DurableTask.AzureStorage.Tests
using System.Threading.Tasks;
using DurableTask.AzureStorage.Messaging;
using DurableTask.AzureStorage.Monitoring;
using DurableTask.AzureStorage.Partitioning;
using DurableTask.AzureStorage.Tracking;
using DurableTask.Core;
using DurableTask.Core.History;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Queue;
using Microsoft.WindowsAzure.Storage.Table;

/// <summary>
Expand Down Expand Up @@ -172,11 +175,11 @@ public async Task<List<IListBlobItem>> ListBlobsAsync(CloudBlobDirectory client)
return results;
}

/// <summary>
/// REQUIREMENT: Workers can be added or removed at any time and control-queue partitions are load-balanced automatically.
/// REQUIREMENT: No two workers will ever process the same control queue.
/// </summary>
[TestMethod]
/// <summary>
/// REQUIREMENT: Workers can be added or removed at any time and control-queue partitions are load-balanced automatically.
/// REQUIREMENT: No two workers will ever process the same control queue.
/// </summary>
[TestMethod]
public async Task MultiWorkerLeaseMovement()
{
const int MaxWorkerCount = 4;
Expand Down Expand Up @@ -371,6 +374,75 @@ public async Task TestInstanceAndMessageDistribution()
}
}

/// <summary>
/// If a partition is lost, verify that all pre-fetched messages associated
/// with that partition are abandoned and not processed.
/// </summary>
[TestMethod]
public async Task PartitionLost_AbandonPrefetchedSession()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like other tests in this class, we're testing methods on AzureStorageOrchestrationService rather than going through TaskHubWorker. We're still going through Azure Storage, but we have a bit more control using this test approach.

{
var settings = new AzureStorageOrchestrationServiceSettings()
{
PartitionCount = 1,
LeaseRenewInterval = TimeSpan.FromMilliseconds(500),
TaskHubName = TestHelpers.GetTestTaskHubName(),
StorageConnectionString = TestHelpers.GetTestStorageAccountConnectionString(),
ControlQueueBufferThreshold = 100,
};

// STEP 1: Start up the service and queue up a large number of messages
var service = new AzureStorageOrchestrationService(settings);
await service.CreateAsync();
await service.StartAsync();

// These instance IDs are set up specifically to bypass message validation logic
// that might otherwise discard these messages as out-of-order, invalid, etc.
var sourceInstance = new OrchestrationInstance();
var targetInstance = new OrchestrationInstance { InstanceId = "@counter@xyz" };

await TestHelpers.WaitFor(
condition: () => service.OwnedControlQueues.Any(),
timeout: TimeSpan.FromSeconds(10));
ControlQueue controlQueue = service.OwnedControlQueues.Single();

List<TaskMessage> messages = Enumerable.Range(0, 100).Select(i => new TaskMessage
{
Event = new EventRaisedEvent(-1, null),
SequenceNumber = i,
OrchestrationInstance = targetInstance,
}).ToList();

await messages.ParallelForEachAsync(
maxConcurrency: 50,
action: msg => controlQueue.AddMessageAsync(msg, sourceInstance));

// STEP 2: Force the lease to be stolen and wait for the lease status to update.
// The orchestration service should detect this and update its state.
BlobLease lease = (await service.ListBlobLeasesAsync()).Single();
await lease.Blob.ChangeLeaseAsync(
proposedLeaseId: Guid.NewGuid().ToString(),
accessCondition: AccessCondition.GenerateLeaseCondition(lease.Token));
await TestHelpers.WaitFor(
condition: () => !service.OwnedControlQueues.Any(),
timeout: TimeSpan.FromSeconds(10));

// Small additional delay to account for tiny race condition.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the race condition that we may have gotten rid of the lease but not yet abandoned our work items yet?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No the work item is abandoned as part of the LockNextTaskOrchestrationWorkItemAsync call, so there is no racing there. The race being referred to here is the fact that OwnedControlQueues collection gets updated one instruction before the ControlQueue.Release() is called, which could mean that TestHelpers.WaitFor could theoretically complete and move onto the next step of the test too soon. It would be extremely rare, but not impossible. I added this delay to avoid the possibility of flakiness in the test execution.

await Task.Delay(250);

// STEP 3: Try to get an orchestration work item - a null value should be returned
// because the lease was lost.
var workItem = await service.LockNextTaskOrchestrationWorkItemAsync(
TimeSpan.FromMinutes(5),
CancellationToken.None);
Assert.IsNull(workItem);

// STEP 4: Verify that all the enqueued messages were abandoned, i.e. put back
// onto the queue with their dequeue counts incremented.
IEnumerable<CloudQueueMessage> queueMessages =
await controlQueue.InnerQueue.PeekMessagesAsync(settings.ControlQueueBatchSize);
Assert.IsTrue(queueMessages.All(msg => msg.DequeueCount == 1));
}

[TestMethod]
public async Task MonitorIdleTaskHubDisconnected()
{
Expand Down
28 changes: 27 additions & 1 deletion Test/DurableTask.AzureStorage.Tests/TestHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ namespace DurableTask.AzureStorage.Tests
{
using System;
using System.Configuration;
using System.Diagnostics;
using System.Threading.Tasks;

static class TestHelpers
{
Expand All @@ -28,7 +30,7 @@ public static TestOrchestrationHost GetTestOrchestrationHost(
var settings = new AzureStorageOrchestrationServiceSettings
{
StorageConnectionString = storageConnectionString,
TaskHubName = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None).AppSettings.Settings["TaskHubName"].Value,
TaskHubName = GetTestTaskHubName(),
ExtendedSessionsEnabled = enableExtendedSessions,
ExtendedSessionIdleTimeout = TimeSpan.FromSeconds(extendedSessionTimeoutInSeconds),
FetchLargeMessageDataEnabled = fetchLargeMessages,
Expand All @@ -48,6 +50,12 @@ public static string GetTestStorageAccountConnectionString()
return storageConnectionString;
}

public static string GetTestTaskHubName()
{
Configuration appConfig = ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.None);
return appConfig.AppSettings.Settings["TaskHubName"].Value;
}

static string GetTestSetting(string name)
{
string value = Environment.GetEnvironmentVariable("DurableTaskTest" + name);
Expand All @@ -58,5 +66,23 @@ static string GetTestSetting(string name)

return value;
}

public static async Task WaitFor(Func<bool> condition, TimeSpan timeout)
{
Stopwatch timer = Stopwatch.StartNew();
do
{
bool result = condition();
if (result)
{
return;
}

await Task.Delay(TimeSpan.FromMilliseconds(100));

} while (timer.Elapsed < timeout);

throw new TimeoutException("Timed out waiting for condition to be true.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,13 @@ public async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAs
return null;
}

// Make sure we still own the partition. If not, abandon the session.
if (session.ControlQueue.IsReleased)
{
await this.AbandonAndReleaseSessionAsync(session);
return null;
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the relevant change. It turned out that we were already setting IsReleased when we stop listening on a particular queue/lease. Here I'm just checking to see if a control queue was released before we try to process the messages.


session.StartNewLogicalTraceScope();

List<MessageData> outOfOrderMessages = null;
Expand Down
1 change: 1 addition & 0 deletions src/DurableTask.AzureStorage/Messaging/ControlQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ public override Task DeleteMessageAsync(MessageData message, SessionBase session
public void Release()
{
this.releaseTokenSource.Cancel();
this.IsReleased = true;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IsReleased would have been set to true anyways because of the cancellation token handling logic, but there can be a bit of a delay. I added this additional IsReleased = true here to speed up the process and make my new test succeed more quickly and reliably.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would love to see a comment that explains this in the code, as it may confuse us in the future otherwise.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can add that.

}

public virtual void Dispose()
Expand Down
6 changes: 3 additions & 3 deletions src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,9 @@ await this.storageQueue.AddMessageAsync(
// We assume that auto-started orchestrations (i.e. instance ids starting with '@')
// are used exclusively by durable entities; so we can follow
// a custom naming convention to pass a time parameter.
var eventName = eventRaisedEvent.Name;
if (eventName.Length >= 3 && eventName[2] == '@'
&& DateTime.TryParse(eventRaisedEvent.Name.Substring(3), out var scheduledTime))
string eventName = eventRaisedEvent.Name;
if (eventName != null && eventName.Length >= 3 && eventName[2] == '@'
&& DateTime.TryParse(eventName.Substring(3), out DateTime scheduledTime))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a fix for a null-ref issue I observed when developing my test. I don't think we would hit it under normal circumstances with Durable Entities since user's don't control the Name property of raised events, but it's theoretically possible for anyone using DurableTask.AzureStorage to hit this if they use our entity naming convention.

{
initialVisibilityDelay = scheduledTime.ToUniversalTime() - DateTime.UtcNow;
if (initialVisibilityDelay < TimeSpan.Zero)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ namespace DurableTask.AzureStorage
using DurableTask.AzureStorage.Monitoring;
using DurableTask.AzureStorage.Tracking;
using DurableTask.Core;
using Microsoft.WindowsAzure.Storage;

class OrchestrationSessionManager : IDisposable
{
Expand Down