Skip to content

Commit

Permalink
DurableTask.AzureStorage API to enumerate instances (#187)
Browse files Browse the repository at this point in the history
  • Loading branch information
TsuyoshiUshio authored and cgillum committed Jun 5, 2018
1 parent 9b9aa52 commit 3f10f0d
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 1 deletion.
25 changes: 25 additions & 0 deletions Test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,31 @@ public async Task SequentialOrchestrationNoReplay()
await host.StopAsync();
}
}

[TestMethod]
public async Task GetAllOrchestrationStatuses()
{
using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false))
{
// Execute the orchestrator twice. Orchestrator will be replied. However instances might be two.
await host.StartAsync();
var client = await host.StartOrchestrationAsync(typeof(Orchestrations.SayHelloInline), "wolrd one");
await client.WaitForCompletionAsync(TimeSpan.FromSeconds(30));
client = await host.StartOrchestrationAsync(typeof(Orchestrations.SayHelloInline), "wolrd two");
await client.WaitForCompletionAsync(TimeSpan.FromSeconds(30));
// Create a client for testing
var serviceClient = host.GetServiceClient();
// TODO Currently we can't use TaskHub. It requires review of Core team.
// Until then, we test it, not using TaskHub. Call diretly the method with some configuration.
var results = await serviceClient.GetOrchestrationStateAsync();
Assert.AreEqual(2, results.Count);
Assert.AreEqual("\"Hello, wolrd one!\"", results[0].Output);
Assert.AreEqual("\"Hello, wolrd two!\"", results[1].Output);

await host.StopAsync();
}
}

/// <summary>
/// End-to-end test which validates parallel function execution by enumerating all files in the current directory
/// in parallel and getting the sum total of all file sizes.
Expand Down
12 changes: 12 additions & 0 deletions Test/DurableTask.AzureStorage.Tests/TestOrchestrationHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,18 @@ public Task StopAsync()
return this.worker.StopAsync(isForced: true);
}

/// <summary>
/// This method is only for testing purpose.
/// When we need to add fix to the DurableTask.Core (e.g. TaskHubClient), we need approval process.
/// during wating for the approval, we can use this method to test the method.
/// This method is not allowed for the production. Before going to the production, please refacotr to use TaskHubClient instead.
/// </summary>
/// <returns></returns>
internal AzureStorageOrchestrationService GetServiceClient()
{
return (AzureStorageOrchestrationService)this.client.serviceClient;
}

public async Task<TestOrchestrationClient> StartOrchestrationAsync(
Type orchestrationType,
object input,
Expand Down
10 changes: 10 additions & 0 deletions src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1543,6 +1543,16 @@ public async Task<OrchestrationState> GetOrchestrationStateAsync(string instance
return await this.trackingStore.GetStateAsync(instanceId, executionId);
}

/// <summary>
/// Get states of the all orchestration instances
/// </summary>
/// <returns>List of <see cref="OrchestrationState"/></returns>
public async Task<IList<OrchestrationState>> GetOrchestrationStateAsync(CancellationToken cancellationToken = default(CancellationToken))
{
await this.EnsureTaskHubAsync();
return await this.trackingStore.GetStateAsync(cancellationToken);
}

/// <summary>
/// Force terminates an orchestration by sending a execution terminated event
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;net451</TargetFrameworks>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<Version>1.2.2</Version>
<Version>1.2.3</Version>
<IncludeSymbols>true</IncludeSymbols>
<Description>Azure Storage provider extension for the Durable Task Framework.</Description>
<Authors>cgillum</Authors>
Expand Down
34 changes: 34 additions & 0 deletions src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,11 @@ public override async Task<OrchestrationState> GetStateAsync(string instanceId,
{
return null;
}
return await ConvertFromAsync(orchestrationInstanceStatus, instanceId);
}

private async Task<OrchestrationState> ConvertFromAsync(OrchestrationInstanceStatus orchestrationInstanceStatus, string instanceId)
{
var orchestrationState = new OrchestrationState();
if (!Enum.TryParse(orchestrationInstanceStatus.RuntimeStatus, out orchestrationState.OrchestrationStatus))
{
Expand Down Expand Up @@ -305,6 +309,36 @@ public override async Task<OrchestrationState> GetStateAsync(string instanceId,
return orchestrationState;
}

/// <inheritdoc />
public override async Task<IList<OrchestrationState>> GetStateAsync(CancellationToken cancellationToken = default(CancellationToken))
{
var query = new TableQuery<OrchestrationInstanceStatus>();
TableContinuationToken token = null;

var orchestrationStates = new List<OrchestrationState>(100);

while (true)
{
var segment = await this.instancesTable.ExecuteQuerySegmentedAsync(query, token); // TODO make sure if it has enough parameters

int previousCount = orchestrationStates.Count;
var tasks = segment.AsEnumerable<OrchestrationInstanceStatus>().Select(async x => await ConvertFromAsync(x, x.PartitionKey));
OrchestrationState[] result = await Task.WhenAll(tasks);
orchestrationStates.AddRange(result);

this.stats.StorageRequests.Increment();
this.stats.TableEntitiesRead.Increment(orchestrationStates.Count - previousCount);

token = segment.ContinuationToken;
if (token == null || cancellationToken.IsCancellationRequested)
{
break;
}
}

return orchestrationStates;
}

/// <inheritdoc />
public override Task PurgeHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType)
{
Expand Down
6 changes: 6 additions & 0 deletions src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ interface ITrackingStore
/// <param name="executionId">Execution Id</param>
Task<OrchestrationState> GetStateAsync(string instanceId, string executionId);

/// <summary>
/// Get The Orchestration State for querying all orchestration instances
/// </summary>
/// <returns></returns>
Task<IList<OrchestrationState>> GetStateAsync(CancellationToken cancellationToken = default(CancellationToken));

/// <summary>
/// Used to set a state in the tracking store whenever a new execution is initiated from the client
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ public override async Task<OrchestrationState> GetStateAsync(string instanceId,
}
}

/// <inheritdoc />
public override Task<IList<OrchestrationState>> GetStateAsync(CancellationToken cancellationToken = default(CancellationToken))
{
throw new NotImplementedException();
}

/// <inheritdoc />
public override Task PurgeHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType)
{
Expand Down
3 changes: 3 additions & 0 deletions src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ abstract class TrackingStoreBase : ITrackingStore
/// <inheritdoc />
public abstract Task<OrchestrationState> GetStateAsync(string instanceId, string executionId);

/// <inheritdoc />
public abstract Task<IList<OrchestrationState>> GetStateAsync(CancellationToken cancellationToken = default(CancellationToken));

/// <inheritdoc />
public abstract Task PurgeHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType);

Expand Down

0 comments on commit 3f10f0d

Please sign in to comment.