Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add orchestrator scheduled actions check when suspended #1024

Merged
merged 3 commits into from
Jan 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/DurableTask.Core/TaskOrchestrationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace DurableTask.Core
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Core.Command;
Expand All @@ -35,6 +36,7 @@ internal class TaskOrchestrationContext : OrchestrationContext
private bool executionCompletedOrTerminated;
private int idCounter;
private readonly Queue<HistoryEvent> eventsWhileSuspended;
private readonly IDictionary<int, OrchestratorAction> suspendedActionsMap;

public bool IsSuspended { get; private set; }

Expand Down Expand Up @@ -63,6 +65,7 @@ public TaskOrchestrationContext(
this.EntityParameters = entityParameters;
ErrorPropagationMode = errorPropagationMode;
this.eventsWhileSuspended = new Queue<HistoryEvent>();
this.suspendedActionsMap = new SortedDictionary<int, OrchestratorAction>();
}

public IEnumerable<OrchestratorAction> OrchestratorActions => this.orchestratorActionsMap.Values;
Expand Down Expand Up @@ -568,11 +571,35 @@ public void HandleEventWhileSuspended(HistoryEvent historyEvent)
public void HandleExecutionSuspendedEvent(ExecutionSuspendedEvent suspendedEvent)
{
this.IsSuspended = true;

// When the orchestrator is suspended, a task could potentially be added to the orchestratorActionsMap.
// This could lead to the task being executed repeatedly without completion until the orchestrator is resumed.
// To prevent this scenario, check if orchestratorActionsMap is empty before proceeding.
if (this.orchestratorActionsMap.Any())
nytian marked this conversation as resolved.
Show resolved Hide resolved
{
// If not, store its contents to a temporary dictionary to allow processing of the task later when orchestrator resumes.
foreach (var pair in this.orchestratorActionsMap)
{
this.suspendedActionsMap.Add(pair.Key, pair.Value);
}
this.orchestratorActionsMap.Clear();
}
}

public void HandleExecutionResumedEvent(ExecutionResumedEvent resumedEvent, Action<HistoryEvent> eventProcessor)
{
this.IsSuspended = false;

// Add the actions stored in the suspendedActionsMap before back to orchestratorActionsMap to ensure proper sequencing.
if (this.suspendedActionsMap.Any())
nytian marked this conversation as resolved.
Show resolved Hide resolved
{
foreach(var pair in this.suspendedActionsMap)
{
this.orchestratorActionsMap.Add(pair.Key, pair.Value);
}
this.suspendedActionsMap.Clear();
}

bachuv marked this conversation as resolved.
Show resolved Hide resolved
while (eventsWhileSuspended.Count > 0)
{
eventProcessor(eventsWhileSuspended.Dequeue());
Expand Down