Skip to content

[WIP] Patch orchestration failure propagation #918

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: v4.x/ps7.2
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/DurableSDK/Commands/WaitDurableTaskCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ protected override void EndProcessing()
}
else
{
_durableTaskHandler.WaitAll(Task, context, WriteObject);
_durableTaskHandler.WaitAll(Task, context, WriteObject, onFailure: failureReason => DurableActivityErrorHandler.Handle(this, failureReason));
}
}

Expand Down
64 changes: 63 additions & 1 deletion src/DurableSDK/DurableTaskHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
internal class DurableTaskHandler
{
private readonly ManualResetEvent _waitForStop = new ManualResetEvent(initialState: false);
private bool enabledCompoundErrorPropagation { get; } =
PowerShellWorkerConfiguration.GetBoolean(Utils.PropagateCompoundErrorsEnvVariable) ?? false;

public void StopAndInitiateDurableTaskOrReplay(
DurableTask task,
Expand Down Expand Up @@ -102,11 +104,49 @@ public void StopAndInitiateDurableTaskOrReplay(
}
}

public static bool IsNonTerminalTaskFailedEvent(
DurableTask task,
OrchestrationContext context,
HistoryEvent scheduledHistoryEvent,
HistoryEvent completedHistoryEvent
)
{

if (task is ActivityInvocationTask activity && completedHistoryEvent.EventType == HistoryEventType.TaskFailed)
{
if (activity.RetryOptions == null)
{
return false;
}
else
{
Action<string> NoOp = _ => { };
// RetryProcessor assumes events have not been processed,
// it will re-assign the `IsProcessed` flag for these events
// it its execution
scheduledHistoryEvent.IsProcessed = false;
completedHistoryEvent.IsProcessed = false;

var isFinalFailureEvent =
RetryProcessor.Process(
context.History,
scheduledHistoryEvent,
activity.RetryOptions.MaxNumberOfAttempts,
onSuccess: NoOp,
onFinalFailure: NoOp);
return !isFinalFailureEvent;
}
}
return false;
}

// Waits for all of the given DurableTasks to complete
public void WaitAll(
IReadOnlyCollection<DurableTask> tasksToWaitFor,
OrchestrationContext context,
Action<object> output)
Action<object> output,
Action<string> onFailure
)
{
context.OrchestrationActionCollector.NextBatch();

Expand All @@ -127,6 +167,13 @@ public void WaitAll(
}

completedHistoryEvent.IsProcessed = true;

if (IsNonTerminalTaskFailedEvent(task, context, scheduledHistoryEvent, completedHistoryEvent))
{
// do not count this as a terminal event for this task
continue;
}

completedEvents.Add(completedHistoryEvent);
}

Expand All @@ -142,6 +189,10 @@ public void WaitAll(
{
output(GetEventResult(completedHistoryEvent));
}
if (completedHistoryEvent.EventType is HistoryEventType.TaskFailed && enabledCompoundErrorPropagation)
{
onFailure(completedHistoryEvent.Reason);
}
}
}
else
Expand Down Expand Up @@ -175,6 +226,17 @@ public void WaitAny(
scheduledHistoryEvent.IsPlayed = true;
}

if (completedHistoryEvent == null)
{
continue;
}

if (IsNonTerminalTaskFailedEvent(task, context, scheduledHistoryEvent, completedHistoryEvent))
{
// do not count this as a terminal event for this task
completedHistoryEvent = null;
}

if (completedHistoryEvent != null)
{
completedTasks.Add(task);
Expand Down
2 changes: 1 addition & 1 deletion src/DurableSDK/Tasks/ActivityInvocationTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class ActivityInvocationTask : DurableTask

private object Input { get; }

private RetryOptions RetryOptions { get; }
internal RetryOptions RetryOptions { get; }

internal ActivityInvocationTask(string functionName, object functionInput, RetryOptions retryOptions)
{
Expand Down
1 change: 1 addition & 0 deletions src/Utility/Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ internal class Utils
internal const string IsOrchestrationFailureKey = "IsOrchestrationFailure";
internal const string TracePipelineObjectCmdlet = "Microsoft.Azure.Functions.PowerShellWorker\\Trace-PipelineObject";
internal const string ExternalDurableSdkEnvVariable = "ExternalDurablePowerShellSDK";
internal const string PropagateCompoundErrorsEnvVariable = "PropagateCompoundErrors";

internal readonly static object BoxedTrue = (object)true;
internal readonly static object BoxedFalse = (object)false;
Expand Down
2 changes: 1 addition & 1 deletion test/Unit/Durable/CurrentUtcDateTimeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void CurrentUtcDateTime_UpdatesToNextOrchestratorStartedTimestamp_IfAllAc
DurableTestUtilities.EmulateStop(durableTaskHandler);
}

durableTaskHandler.WaitAll(tasksToWaitFor, context, output => allOutput.Add(output));
durableTaskHandler.WaitAll(tasksToWaitFor, context, output => allOutput.Add(output), _ => { });

if (allCompleted)
{
Expand Down
8 changes: 4 additions & 4 deletions test/Unit/Durable/DurableTaskHandlerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void WaitAll_OutputsTaskResults_WhenAllTasksCompleted(
var allOutput = new List<object>();

var durableTaskHandler = new DurableTaskHandler();
durableTaskHandler.WaitAll(tasksToWaitFor, orchestrationContext, output => { allOutput.Add(output); });
durableTaskHandler.WaitAll(tasksToWaitFor, orchestrationContext, output => { allOutput.Add(output); }, _ => { });

Assert.Equal(new[] { "Result1", "Result2", "Result3" }, allOutput);
VerifyNoOrchestrationActionAdded(orchestrationContext);
Expand Down Expand Up @@ -88,7 +88,7 @@ public void WaitAll_OutputsNothing_WhenAnyTaskIsNotCompleted(
DurableTestUtilities.EmulateStop(durableTaskHandler);

durableTaskHandler.WaitAll(tasksToWaitFor, orchestrationContext,
_ => { Assert.True(false, "Unexpected output"); });
_ => { Assert.True(false, "Unexpected output"); }, _ => { });

VerifyNoOrchestrationActionAdded(orchestrationContext);
}
Expand Down Expand Up @@ -122,7 +122,7 @@ public void WaitAll_WaitsForStop_WhenAnyTaskIsNotCompleted(bool scheduledAndComp
expectedWaitForStop,
() =>
{
durableTaskHandler.WaitAll(tasksToWaitFor, orchestrationContext, _ => { });
durableTaskHandler.WaitAll(tasksToWaitFor, orchestrationContext, _ => { }, _ => { });
});
}

Expand Down Expand Up @@ -242,7 +242,7 @@ public void WaitAll_And_WaitAny_StartNewActivityBatch(bool invokeWaitAll, bool i
if (invokeWaitAll)
{
durableTaskHandler.Stop(); // just to avoid the next call getting stuck waiting for a stop event
durableTaskHandler.WaitAll(new DurableTask[0], orchestrationContext, output: _ => {});
durableTaskHandler.WaitAll(new DurableTask[0], orchestrationContext, output: _ => {}, _ => { });
}

if (invokeWaitAny)
Expand Down