Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DurableTask.AzureStorage API to enumerate instances #187

Merged
merged 11 commits into from
Jun 5, 2018
80 changes: 80 additions & 0 deletions src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,86 @@ public override async Task<OrchestrationState> GetStateAsync(string instanceId,
return orchestrationState;
}

/// <inheritdoc />
public override async Task<IList<OrchestrationState>> GetStateAsync(CancellationToken cancellationToken = default(CancellationToken))
{
var query = new TableQuery();
TableContinuationToken token = null;
var instanceTableEntities = new List<DynamicTableEntity>(100);
var stopwatch = new Stopwatch();
var requestCount = 0;
bool finishedEarly = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like stopwatch, requestCount, and finishedEarly aren't really used for anything. Can they be removed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!


while (true)
{
requestCount++;
stopwatch.Start();
var segment = await this.instancesTable.ExecuteQuerySegmentedAsync(query, token); // TODO make sure if it has enough parameters
stopwatch.Stop();

int previousCount = instanceTableEntities.Count;
instanceTableEntities.AddRange(segment);

// TODO do we need these?
this.stats.StorageRequests.Increment();
this.stats.TableEntitiesRead.Increment(instanceTableEntities.Count - previousCount);

token = segment.ContinuationToken;
if (finishedEarly || token == null || cancellationToken.IsCancellationRequested)
{
break;
}
}

IList<OrchestrationState> orchestrationStates;

orchestrationStates = new List<OrchestrationState>(instanceTableEntities.Count);

if (instanceTableEntities.Count > 0)
{
foreach(DynamicTableEntity entity in instanceTableEntities)
{
var instance = new OrchestrationInstance();
instance.InstanceId = entity.Properties["PartitionKey"].ToString();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take a look at the previous GetStateAsync method above. Since it already has logic to convert table storage rows into OrchestrationState objects, I think it would be best to reuse that code as much as possible. That way if we need to change the logic for reading from table storage, we can do it in one place instead of two places.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I’ll fix that!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @cgillum ,

Will it make sense to add additional partition in the Instance table with partition key "OnTheFly" (and row key the instance id) that will contain all the instances currently on the fly. Each of the instance will have also dedicated row with its instance id as partition key as well. We can have a setting that will enable the duplicate update of the instance in the "OnTheFly" partition and its own partition. When the instance completes we will delete the row in the "OnTheFly" partition. This will be one more call but we will know both the partition and the row key.

Then the query for retrieving the on the fly instances will be straightforward. I am worried that the Instnaces table will grow over time and a query that will make table scan each time will have bad performance impact.

Thank you!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! lets discuss that. I also worry about the performance since storage table only have a key for partition key and the instance table might be growing. :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @gled4er , One question. OrchestrationInstanceStatus might be a domain class of instance table right? Then which attribute is the instanceId? I guess PartitionKey. Is it right?


// TODO we need to discuss the parameter and query structure.
orchestrationStates.Add(
new OrchestrationState
{
CompletedTime = DateTime.Parse(entity.Properties["CompletedTime"]?.ToString()),
Name = entity.Properties["Name"]?.ToString(),
OrchestrationInstance = instance,
OrchestrationStatus = ConvertStatus(entity.Properties["OrchestrationStatus"].ToString())
}
);
}
}
return orchestrationStates;
}

private OrchestrationStatus ConvertStatus(string state)
{
switch(state)
{
case "Canceled":
return OrchestrationStatus.Canceled;
case "Completed":
return OrchestrationStatus.Completed;
case "ContinuedAsNew":
return OrchestrationStatus.ContinuedAsNew;
case "Failed":
return OrchestrationStatus.Failed;
case "Pending":
return OrchestrationStatus.Pending;
case "Running":
return OrchestrationStatus.Running;
case "Terminated":
return OrchestrationStatus.Terminated;
default:
throw new ArgumentException($"OrchestrationStatus is not what I expected: {state}");
}
}

/// <inheritdoc />
public override Task PurgeHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType)
{
Expand Down
6 changes: 6 additions & 0 deletions src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ interface ITrackingStore
/// <param name="executionId">Execution Id</param>
Task<OrchestrationState> GetStateAsync(string instanceId, string executionId);

/// <summary>
/// Get The Orchestration State for querying all orchestration instances
/// </summary>
/// <returns></returns>
Task<IList<OrchestrationState>> GetStateAsync(CancellationToken cancellationToken = default(CancellationToken));

/// <summary>
/// Used to set a state in the tracking store whenever a new execution is initiated from the client
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ public override async Task<OrchestrationState> GetStateAsync(string instanceId,
}
}

/// <inheritdoc />
public override Task<IList<OrchestrationState>> GetStateAsync(CancellationToken cancellationToken = default(CancellationToken))
{
throw new NotImplementedException();
}

/// <inheritdoc />
public override Task PurgeHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType)
{
Expand Down
3 changes: 3 additions & 0 deletions src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ abstract class TrackingStoreBase : ITrackingStore
/// <inheritdoc />
public abstract Task<OrchestrationState> GetStateAsync(string instanceId, string executionId);

/// <inheritdoc />
public abstract Task<IList<OrchestrationState>> GetStateAsync(CancellationToken cancellationToken = default(CancellationToken));

/// <inheritdoc />
public abstract Task PurgeHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType);

Expand Down