Skip to content

Commit

Permalink
Initial commit: API surface area and non-passing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
cgillum committed Nov 2, 2024
1 parent e0b54b8 commit 80e1322
Show file tree
Hide file tree
Showing 4 changed files with 327 additions and 1 deletion.
26 changes: 26 additions & 0 deletions src/Client/Core/DurableTaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,32 @@ public virtual Task ResumeInstanceAsync(string instanceId, CancellationToken can
public abstract Task ResumeInstanceAsync(
string instanceId, string? reason = null, CancellationToken cancellation = default);

/// <summary>
/// Rewinds the specified orchestration instance to a previous, non-failed state.
/// </summary>
/// <remarks>
/// <para>
/// Only orchestrations in a failed state can be rewound. Attempting to rewind an orchestration in a non-failed
/// state may result in either a no-op or a failure depending on the backend implementation.
/// </para><para>
/// Rewind works by rewriting an orchestration's history to remove the most recent failure records, and then
/// re-executing the orchestration with the modified history. This effectively "rewinds" the orchestration to a
/// previous "good" state, allowing it to re-execute the logic that caused the original failure.
/// </para><para>
/// Rewinding an orchestration is intended to be used in cases where a failure is caused by a transient issue that
/// has since been resolved. It is not intended to be used as a general-purpose retry mechanism.
/// </para>
/// </remarks>
/// <param name="instanceId">The instance ID of the orchestration to rewind.</param>
/// <param name="reason">The optional rewind reason, which is recorded in the orchestration history.</param>
/// <param name="cancellation">
/// A <see cref="CancellationToken"/> that can be used to cancel the rewind API call. Note that cancelling this
/// token does not cancel the rewind operation once it has been successfully enqueued.
/// </param>
/// <returns>A task that completes when the rewind operation was been successfully.</returns>
public abstract Task RewindInstanceAsync(
string instanceId, string? reason = null, CancellationToken cancellation = default);

/// <inheritdoc cref="GetInstanceAsync(string, bool, CancellationToken)"/>
public virtual Task<OrchestrationMetadata?> GetInstanceAsync(
string instanceId, CancellationToken cancellation)
Expand Down
22 changes: 22 additions & 0 deletions src/Client/Grpc/GrpcDurableTaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,28 @@ public override async Task ResumeInstanceAsync(
}
}

/// <inheritdoc/>
public override async Task RewindInstanceAsync(
string instanceId, string? reason = null, CancellationToken cancellation = default)
{
if (string.IsNullOrEmpty(instanceId))
{
throw new ArgumentNullException(nameof(instanceId));
}

try
{
await this.sidecarClient.RewindInstanceAsync(
new P.RewindInstanceRequest { InstanceId = instanceId, Reason = reason },
cancellationToken: cancellation);
}
catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled)
{
throw new OperationCanceledException(
$"The {nameof(this.RewindInstanceAsync)} operation was canceled.", e, cancellation);
}
}

/// <inheritdoc/>
public override async Task<OrchestrationMetadata?> GetInstancesAsync(
string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,16 @@ public override Task TerminateInstanceAsync(
return this.Client.ForceTerminateTaskOrchestrationAsync(instanceId, reason);
}

/// <inheritdoc/>
public override Task RewindInstanceAsync(
string instanceId, string? reason = null, CancellationToken cancellation = default)
{
// At the time of writing, there is no IOrchestrationXXXClient interface that supports rewind.
// Rather, it's only supported by specific backend implementations like Azure Storage. Once an interface
// with rewind is added to DurableTask.Core, we can add support for it here.
throw new NotSupportedException("Rewind is not supported by the current client.");
}

/// <inheritdoc/>
public override async Task<OrchestrationMetadata> WaitForInstanceCompletionAsync(
string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default)
Expand Down Expand Up @@ -219,7 +229,7 @@ public override async Task<OrchestrationMetadata> WaitForInstanceStartAsync(
}
}

[return: NotNullIfNotNull("state")]
[return: NotNullIfNotNull(nameof(state))]
OrchestrationMetadata? ToMetadata(Core.OrchestrationState? state, bool getInputsAndOutputs)
{
if (state is null)
Expand Down
268 changes: 268 additions & 0 deletions test/Grpc.IntegrationTests/OrchestrationErrorHandling.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License.

using System.Runtime.Serialization;
using DurableTask.Core;
using Microsoft.DurableTask.Client;
using Microsoft.DurableTask.Worker;
using Xunit.Abstractions;
Expand Down Expand Up @@ -589,6 +590,273 @@ static void ValidateInnermostFailureDetailsChain(TaskFailureDetails? failureDeta
ValidateInnermostFailureDetailsChain(metadata.FailureDetails.InnerFailure.InnerFailure);
}

/// <summary>
/// Tests the behavior of a failed orchestration when it is rewound and re-executed.
/// </summary>
[Fact]
public async Task RewindSingleFailedActivity()
{
bool isBusted = true;

TaskName orchestratorName = "BustedOrchestration";
TaskName activityName = "BustedActivity";

await using HostTestLifetime server = await this.StartWorkerAsync(b =>
{
b.AddTasks(tasks =>
tasks.AddOrchestratorFunc(orchestratorName, async ctx =>
{
await ctx.CallActivityAsync(activityName);
})
.AddActivityFunc(activityName, (TaskActivityContext context) =>
{
if (isBusted)
{
throw new Exception("Kah-BOOOOOM!!!");
}
}));
});

DurableTaskClient client = server.Client;

// Start the orchestration and wait for it to fail.
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);
OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus);

// Simulate "fixing" the original problem by setting the flag to false.
isBusted = false;

// Rewind the orchestration to put it back into a running state. It should complete successfully this time.
await client.RewindInstanceAsync(instanceId, "Rewind failed orchestration");
metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
}

/// <summary>
/// Tests the behavior of a failed orchestration when it is rewound and re-executed multiple times.
/// </summary>
[Fact]
public async Task RewindMultipleFailedActivities_Serial()
{
bool isBusted1 = true;
bool isBusted2 = true;

TaskName orchestratorName = "BustedOrchestration";
TaskName activityName1 = "BustedActivity1";
TaskName activityName2 = "BustedActivity2";

int activity1CompletionCount = 0;
int activity2CompletionCount = 0;

await using HostTestLifetime server = await this.StartWorkerAsync(b =>
{
b.AddTasks(tasks =>
tasks.AddOrchestratorFunc(orchestratorName, async ctx =>
{
// Take the result of the first activity and pass it to the second activity
int result = await ctx.CallActivityAsync<int>(activityName1);
return await ctx.CallActivityAsync<int>(activityName2, input: result);
})
.AddActivityFunc(activityName1, (TaskActivityContext context) =>
{
if (isBusted1)
{
throw new Exception("Failure1");
}
activity1CompletionCount++;
return 1;
})
.AddActivityFunc(activityName2, (TaskActivityContext context, int input) =>
{
if (isBusted2)
{
throw new Exception("Failure2");
}
activity2CompletionCount++;
return input + 1;
}));
});

DurableTaskClient client = server.Client;

// Start the orchestration and wait for it to fail with an ApplicationException.
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);
OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus);
Assert.Equal("Failure1", metadata.FailureDetails?.ErrorMessage);

// Simulate "fixing" just the first problem by setting the first flag to false.
isBusted1 = false;

// Rewind the orchestration. It should fail again, but this time with a different error message.
await client.RewindInstanceAsync(instanceId, "Rewind failed orchestration");
metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus);
Assert.Equal("Failure2", metadata.FailureDetails?.ErrorMessage);

// Simulate "fixing" the second problem by setting the second flag to false.
isBusted2 = false;

// Rewind the orchestration again to put it back into a running state. It should now complete successfully.
await client.RewindInstanceAsync(instanceId, "Rewind failed orchestration");
metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
Assert.Equal(2, metadata.ReadOutputAs<int>());

// Confirm that each activity completed exactly once (i.e. successful activity calls aren't rewound).
Assert.Equal(1, activity1CompletionCount);
Assert.Equal(1, activity2CompletionCount);
}

/// <summary>
/// Tests the behavior of a failed orchestration when multiple failures occur as part of a fan-out/fan-in, and is
/// subsequently rewound and re-executed.
/// </summary>
[Fact]
public async Task RewindMultipleFailedActivities_Parallel()
{
bool isBusted = true;

TaskName orchestratorName = "BustedOrchestration";
TaskName activityName = "BustedActivity";

await using HostTestLifetime server = await this.StartWorkerAsync(b =>
{
b.AddTasks(tasks =>
tasks.AddOrchestratorFunc(orchestratorName, async ctx =>
{
// Run the activity function multiple times in parallel
IList<Task> tasks = Enumerable.Range(0, 10)
.Select(i => ctx.CallActivityAsync(activityName))
.ToList();
// Wait for all the activity functions to complete
await Task.WhenAll(tasks);
return "Done";
})
.AddActivityFunc(activityName, (TaskActivityContext context) =>
{
if (isBusted)
{
throw new Exception("Kah-BOOOOOM!!!");
}
}));
});

DurableTaskClient client = server.Client;

// Start the orchestration and wait for it to fail.
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);
OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus);

// Simulate "fixing" the original problem by setting the flag to false.
isBusted = false;

// Rewind the orchestration to put it back into a running state. It should complete successfully this time.
await client.RewindInstanceAsync(instanceId, "Rewind failed orchestration");
metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
Assert.Equal("Done", metadata.ReadOutputAs<string>());
}

/// <summary>
/// Tests rewinding an orchestration that failed due to a failed sub-orchestration. The sub-orchestration is fixed
/// and the parent orchestration is rewound to allow the entire chain to complete successfully.
/// </summary>
[Fact]
public async Task RewindFailedSubOrchestration()
{
bool isBusted = true;

TaskName orchestratorName = "BustedOrchestrator";
TaskName subOrchestratorName = "BustedSubOrchestrator";

await using HostTestLifetime server = await this.StartWorkerAsync(b =>
{
b.AddTasks(tasks => tasks
.AddOrchestratorFunc(orchestratorName, async ctx =>
{
await ctx.CallSubOrchestratorAsync(subOrchestratorName);
})
.AddOrchestratorFunc(subOrchestratorName, ctx =>
{
if (isBusted)
{
throw new Exception("Kah-BOOOOOM!!!");
}
}));
});

DurableTaskClient client = server.Client;

// Start the orchestration and wait for it to fail.
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);
OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus);

// Simulate "fixing" the original problem by setting the flag to false.
isBusted = false;

// Rewind the orchestration to put it back into a running state. It should complete successfully this time.
await client.RewindInstanceAsync(instanceId, "Rewind failed orchestration");
metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
}

/// <summary>
/// Tests rewinding an orchestration that failed due to a failed sub-orchestration, which itself failed due to an
/// activity. The entire orchestration chain is expected to fail, and rewinding the parent orchestration should
/// allow the entire chain to complete successfully.
/// </summary>
[Fact]
public async Task RewindFailedSubOrchestrationWithActivity()
{
bool isBusted = true;

TaskName orchestratorName = "BustedOrchestrator";
TaskName subOrchestratorName = "BustedSubOrchestrator";
TaskName activityName = "BustedActivity";

await using HostTestLifetime server = await this.StartWorkerAsync(b =>
{
b.AddTasks(tasks => tasks
.AddOrchestratorFunc(orchestratorName, async ctx =>
{
await ctx.CallSubOrchestratorAsync(subOrchestratorName);
})
.AddOrchestratorFunc(subOrchestratorName, async ctx =>
{
await ctx.CallActivityAsync(activityName);
})
.AddActivityFunc(activityName, (TaskActivityContext _) =>
{
if (isBusted)
{
throw new Exception("Kah-BOOOOOM!!!");
}
}));
});

DurableTaskClient client = server.Client;

// Start the orchestration and wait for it to fail.
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);
OrchestrationMetadata metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus);

// Simulate "fixing" the original problem by setting the flag to false.
isBusted = false;

// Rewind the orchestration to put it back into a running state. It should complete successfully this time.
await client.RewindInstanceAsync(instanceId, "Rewind failed orchestration");
metadata = await client.WaitForInstanceCompletionAsync(instanceId, this.TimeoutToken);
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
}

static Exception MakeException(Type exceptionType, string message)
{
// We assume the contructor of the exception type takes a single string argument
Expand Down

0 comments on commit 80e1322

Please sign in to comment.