Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DurableTask.AzureStorage API to enumerate instances #187

Merged
merged 11 commits into from
Jun 5, 2018
25 changes: 25 additions & 0 deletions Test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,31 @@ public async Task SequentialOrchestrationNoReplay()
await host.StopAsync();
}
}

[TestMethod]
public async Task GetAllOrchestrationStatuses()
{
using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false))
{
// Execute the orchestrator twice. Orchestrator will be replied. However instances might be two.
await host.StartAsync();
var client = await host.StartOrchestrationAsync(typeof(Orchestrations.SayHelloInline), "wolrd one");
await client.WaitForCompletionAsync(TimeSpan.FromSeconds(30));
client = await host.StartOrchestrationAsync(typeof(Orchestrations.SayHelloInline), "wolrd two");
await client.WaitForCompletionAsync(TimeSpan.FromSeconds(30));
// Create a client for testing
var serviceClient = host.GetServiceClient();
// TODO Currently we can't use TaskHub. It requires review of Core team.
// Until then, we test it, not using TaskHub. Call diretly the method with some configuration.
var results = await serviceClient.GetOrchestrationStateAsync();
Assert.AreEqual(2, results.Count);
Assert.AreEqual("\"Hello, wolrd one!\"", results[0].Output);
Assert.AreEqual("\"Hello, wolrd two!\"", results[1].Output);

await host.StopAsync();
}
}

/// <summary>
/// End-to-end test which validates parallel function execution by enumerating all files in the current directory
/// in parallel and getting the sum total of all file sizes.
Expand Down
12 changes: 12 additions & 0 deletions Test/DurableTask.AzureStorage.Tests/TestOrchestrationHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,18 @@ public Task StopAsync()
return this.worker.StopAsync(isForced: true);
}

/// <summary>
/// This method is only for testing purpose.
/// When we need to add fix to the DurableTask.Core (e.g. TaskHubClient), we need approval process.
/// during wating for the approval, we can use this method to test the method.
/// This method is not allowed for the production. Before going to the production, please refacotr to use TaskHubClient instead.
/// </summary>
/// <returns></returns>
internal AzureStorageOrchestrationService GetServiceClient()
{
return (AzureStorageOrchestrationService)this.client.serviceClient;
}

public async Task<TestOrchestrationClient> StartOrchestrationAsync(
Type orchestrationType,
object input,
Expand Down
10 changes: 10 additions & 0 deletions src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1543,6 +1543,16 @@ public async Task<OrchestrationState> GetOrchestrationStateAsync(string instance
return await this.trackingStore.GetStateAsync(instanceId, executionId);
}

/// <summary>
/// Get states of the all orchestration instances
/// </summary>
/// <returns>List of <see cref="OrchestrationState"/></returns>
public async Task<IList<OrchestrationState>> GetOrchestrationStateAsync(CancellationToken cancellationToken = default(CancellationToken))
{
await this.EnsureTaskHubAsync();
return await this.trackingStore.GetStateAsync(cancellationToken);
}

/// <summary>
/// Force terminates an orchestration by sending a execution terminated event
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;net451</TargetFrameworks>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<Version>1.2.2</Version>
<Version>1.2.3</Version>
<IncludeSymbols>true</IncludeSymbols>
<Description>Azure Storage provider extension for the Durable Task Framework.</Description>
<Authors>cgillum</Authors>
Expand Down
34 changes: 34 additions & 0 deletions src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,11 @@ public override async Task<OrchestrationState> GetStateAsync(string instanceId,
{
return null;
}
return await ConvertFromAsync(orchestrationInstanceStatus, instanceId);
}

private async Task<OrchestrationState> ConvertFromAsync(OrchestrationInstanceStatus orchestrationInstanceStatus, string instanceId)
{
var orchestrationState = new OrchestrationState();
if (!Enum.TryParse(orchestrationInstanceStatus.RuntimeStatus, out orchestrationState.OrchestrationStatus))
{
Expand Down Expand Up @@ -305,6 +309,36 @@ public override async Task<OrchestrationState> GetStateAsync(string instanceId,
return orchestrationState;
}

/// <inheritdoc />
public override async Task<IList<OrchestrationState>> GetStateAsync(CancellationToken cancellationToken = default(CancellationToken))
{
var query = new TableQuery<OrchestrationInstanceStatus>();
TableContinuationToken token = null;

var orchestrationStates = new List<OrchestrationState>(100);

while (true)
{
var segment = await this.instancesTable.ExecuteQuerySegmentedAsync(query, token); // TODO make sure if it has enough parameters

int previousCount = orchestrationStates.Count;
var tasks = segment.AsEnumerable<OrchestrationInstanceStatus>().Select(async x => await ConvertFromAsync(x, x.PartitionKey));
OrchestrationState[] result = await Task.WhenAll(tasks);
orchestrationStates.AddRange(result);

this.stats.StorageRequests.Increment();
this.stats.TableEntitiesRead.Increment(orchestrationStates.Count - previousCount);

token = segment.ContinuationToken;
if (token == null || cancellationToken.IsCancellationRequested)
{
break;
}
}

return orchestrationStates;
}

/// <inheritdoc />
public override Task PurgeHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType)
{
Expand Down
6 changes: 6 additions & 0 deletions src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ interface ITrackingStore
/// <param name="executionId">Execution Id</param>
Task<OrchestrationState> GetStateAsync(string instanceId, string executionId);

/// <summary>
/// Get The Orchestration State for querying all orchestration instances
/// </summary>
/// <returns></returns>
Task<IList<OrchestrationState>> GetStateAsync(CancellationToken cancellationToken = default(CancellationToken));

/// <summary>
/// Used to set a state in the tracking store whenever a new execution is initiated from the client
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ public override async Task<OrchestrationState> GetStateAsync(string instanceId,
}
}

/// <inheritdoc />
public override Task<IList<OrchestrationState>> GetStateAsync(CancellationToken cancellationToken = default(CancellationToken))
{
throw new NotImplementedException();
}

/// <inheritdoc />
public override Task PurgeHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType)
{
Expand Down
3 changes: 3 additions & 0 deletions src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ abstract class TrackingStoreBase : ITrackingStore
/// <inheritdoc />
public abstract Task<OrchestrationState> GetStateAsync(string instanceId, string executionId);

/// <inheritdoc />
public abstract Task<IList<OrchestrationState>> GetStateAsync(CancellationToken cancellationToken = default(CancellationToken));

/// <inheritdoc />
public abstract Task PurgeHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType);

Expand Down