From f955c48c348decefefca21f14e5c81e47cde7b3f Mon Sep 17 00:00:00 2001 From: Kanodia Adarsh Date: Fri, 18 Jan 2019 18:54:33 -0800 Subject: [PATCH 1/8] Optimize Continue As New And Solve Missed Events --- .../AzureStorageScenarioTests.cs | 8 +- .../AzureStorageOrchestrationService.cs | 10 +- ...zureStorageOrchestrationServiceSettings.cs | 5 + ...OrchestrationCompleteOrchestratorAction.cs | 9 + src/DurableTask.Core/IOrchestrationService.cs | 5 + .../TaskOrchestrationDispatcherSettings.cs | 6 + .../TaskOrchestrationContext.cs | 7 + .../TaskOrchestrationDispatcher.cs | 279 ++++++++++-------- .../TaskOrchestrationExecutor.cs | 15 +- .../LocalOrchestrationService.cs | 5 + .../ServiceBusOrchestrationService.cs | 26 +- .../ServiceBusOrchestrationServiceTests.cs | 50 ++++ .../SimpleOrchestrations.cs | 48 +++ 13 files changed, 337 insertions(+), 136 deletions(-) diff --git a/Test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs b/Test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs index a519b6c31..43ef48531 100644 --- a/Test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs +++ b/Test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs @@ -542,13 +542,9 @@ public async Task ActorOrchestration(bool enableExtendedSessions) // TODO: Sleeping to avoid a race condition where multiple ContinueAsNew messages // are processed by the same instance at the same time, resulting in a corrupt // storage failure in DTFx. - await Task.Delay(2000); await client.RaiseEventAsync("operation", "incr"); - await Task.Delay(2000); await client.RaiseEventAsync("operation", "incr"); - await Task.Delay(2000); await client.RaiseEventAsync("operation", "decr"); - await Task.Delay(2000); await client.RaiseEventAsync("operation", "incr"); await Task.Delay(2000); @@ -609,7 +605,7 @@ public async Task ActorOrchestrationDeleteAllLargeMessageBlobs(bool enableExtend // Ideally there would only be three blobs at the end of the test. // TODO: https://github.com/Azure/azure-functions-durable-extension/issues/509 - Assert.AreEqual(9, blobCount); + Assert.AreEqual(3, blobCount); await client.PurgeInstanceHistoryByTimePeriod( startDateTime, @@ -1963,7 +1959,7 @@ async Task WaitForOperation() public override void OnEvent(OrchestrationContext context, string name, string input) { Assert.AreEqual("operation", name, true, "Unknown signal recieved..."); - if (this.waitForOperationHandle != null) + if (this.waitForOperationHandle != null && !this.waitForOperationHandle.Task.IsCompleted) { this.waitForOperationHandle.SetResult(input); } diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index d280b7f8b..dfd04e741 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -274,6 +274,14 @@ public int MaxConcurrentTaskActivityWorkItems get { return this.settings.MaxConcurrentTaskActivityWorkItems; } } + /// + /// Will not carry over unexecuted raised events to the next iteration of an orchestration on ContinueAsNew + /// + public bool SkipEventsOnContinuation + { + get { return this.settings.SkipEventsOnContinuation; } + } + // We always leave the dispatcher counts at one unless we can find a customer workload that requires more. /// public int TaskActivityDispatcherCount { get; } = 1; @@ -816,7 +824,7 @@ public async Task CompleteTaskOrchestrationWorkItemAsync( } session.StartNewLogicalTraceScope(); - OrchestrationRuntimeState runtimeState = workItem.OrchestrationRuntimeState; + OrchestrationRuntimeState runtimeState = newOrchestrationRuntimeState??workItem.OrchestrationRuntimeState; string instanceId = workItem.InstanceId; string executionId = runtimeState.OrchestrationInstance.ExecutionId; diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs index fe4ed3c82..5967ddabe 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs @@ -139,5 +139,10 @@ public class AzureStorageOrchestrationServiceSettings /// If provided, this is used to connect to Azure Storage /// public StorageAccountDetails StorageAccountDetails { get; set; } + + /// + /// Will not carry over unexecuted raised events to the next iteration of an orchestration on ContinueAsNew + /// + public bool SkipEventsOnContinuation { get; set; } = false; } } diff --git a/src/DurableTask.Core/Command/OrchestrationCompleteOrchestratorAction.cs b/src/DurableTask.Core/Command/OrchestrationCompleteOrchestratorAction.cs index 1b696cdd2..0b32404e5 100644 --- a/src/DurableTask.Core/Command/OrchestrationCompleteOrchestratorAction.cs +++ b/src/DurableTask.Core/Command/OrchestrationCompleteOrchestratorAction.cs @@ -11,10 +11,17 @@ // limitations under the License. // ---------------------------------------------------------------------------------- +using System.Collections.Generic; +using DurableTask.Core.History; + namespace DurableTask.Core.Command { internal class OrchestrationCompleteOrchestratorAction : OrchestratorAction { + public OrchestrationCompleteOrchestratorAction() + { + CarryOverEvents = new List(); + } public OrchestrationStatus OrchestrationStatus; public override OrchestratorActionType OrchestratorActionType => OrchestratorActionType.OrchestrationComplete; @@ -24,5 +31,7 @@ internal class OrchestrationCompleteOrchestratorAction : OrchestratorAction public string Details { get; set; } public string NewVersion { get; set; } + + public IList CarryOverEvents { get; set; } } } \ No newline at end of file diff --git a/src/DurableTask.Core/IOrchestrationService.cs b/src/DurableTask.Core/IOrchestrationService.cs index 197a8956d..ea0d5c644 100644 --- a/src/DurableTask.Core/IOrchestrationService.cs +++ b/src/DurableTask.Core/IOrchestrationService.cs @@ -97,6 +97,11 @@ public interface IOrchestrationService /// int MaxConcurrentTaskOrchestrationWorkItems { get; } + /// + /// Will not carry over unexecuted raised events to the next iteration of an orchestration on ContinueAsNew + /// + bool SkipEventsOnContinuation { get; } + /// /// Wait for the next orchestration work item and return the orchestration work item /// diff --git a/src/DurableTask.Core/Settings/TaskOrchestrationDispatcherSettings.cs b/src/DurableTask.Core/Settings/TaskOrchestrationDispatcherSettings.cs index e415749e9..533719c1f 100644 --- a/src/DurableTask.Core/Settings/TaskOrchestrationDispatcherSettings.cs +++ b/src/DurableTask.Core/Settings/TaskOrchestrationDispatcherSettings.cs @@ -28,6 +28,7 @@ public TaskOrchestrationDispatcherSettings() DispatcherCount = FrameworkConstants.OrchestrationDefaultDispatcherCount; MaxConcurrentOrchestrations = FrameworkConstants.OrchestrationDefaultMaxConcurrentItems; CompressOrchestrationState = false; + SkipEventsOnContinuation = false; } /// @@ -55,6 +56,11 @@ public TaskOrchestrationDispatcherSettings() /// public bool CompressOrchestrationState { get; set; } + /// + /// Will not carry over unexecuted raised events to the next iteration of an orchestration on ContinueAsNew + /// + public bool SkipEventsOnContinuation { get; set; } + internal TaskOrchestrationDispatcherSettings Clone() { return new TaskOrchestrationDispatcherSettings diff --git a/src/DurableTask.Core/TaskOrchestrationContext.cs b/src/DurableTask.Core/TaskOrchestrationContext.cs index 54da1f9e9..792da6aa4 100644 --- a/src/DurableTask.Core/TaskOrchestrationContext.cs +++ b/src/DurableTask.Core/TaskOrchestrationContext.cs @@ -35,6 +35,12 @@ internal class TaskOrchestrationContext : OrchestrationContext bool executionTerminated; int idCounter; + public bool HasContinueAsNew => continueAsNew != null; + + public void AddEventToNextIteration(HistoryEvent he){ + continueAsNew.CarryOverEvents.Add(he); + } + public TaskOrchestrationContext(OrchestrationInstance orchestrationInstance, TaskScheduler taskScheduler) { Utils.UnusedParameter(taskScheduler); @@ -54,6 +60,7 @@ public TaskOrchestrationContext(OrchestrationInstance orchestrationInstance, Tas internal void ClearPendingActions() { this.orchestratorActionsMap.Clear(); + continueAsNew = null; } public override async Task ScheduleTask(string name, string version, diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index 31c460f77..5835d98e2 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -192,11 +192,16 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work ExecutionStartedEvent continueAsNewExecutionStarted = null; TaskMessage continuedAsNewMessage = null; + IList carryOverEvents = null; OrchestrationRuntimeState runtimeState = workItem.OrchestrationRuntimeState; runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); + OrchestrationRuntimeState oldOrchestrationRuntimeState = runtimeState; + + OrchestrationState instanceState = null; + if (!ReconcileMessagesWithState(workItem)) { // TODO : mark an orchestration as faulted if there is data corruption @@ -209,169 +214,195 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work } else { - TraceHelper.TraceInstance( + do + { + continuedAsNew = false; + continuedAsNewMessage = null; + + TraceHelper.TraceInstance( TraceEventType.Verbose, "TaskOrchestrationDispatcher-ExecuteUserOrchestration-Begin", runtimeState.OrchestrationInstance, "Executing user orchestration: {0}", DataConverter.Serialize(runtimeState.GetOrchestrationRuntimeStateDump(), true)); - if (workItem.Cursor == null) - { - workItem.Cursor = await ExecuteOrchestrationAsync(runtimeState); - } - else - { - await ResumeOrchestrationAsync(workItem.Cursor); - } - - IReadOnlyList decisions = workItem.Cursor.LatestDecisions.ToList(); + if (workItem.Cursor == null) + { + workItem.Cursor = await ExecuteOrchestrationAsync(runtimeState); + } + else + { + await ResumeOrchestrationAsync(workItem.Cursor); + } - TraceHelper.TraceInstance( - TraceEventType.Information, - "TaskOrchestrationDispatcher-ExecuteUserOrchestration-End", - runtimeState.OrchestrationInstance, - "Executed user orchestration. Received {0} orchestrator actions: {1}", - decisions.Count, - string.Join(", ", decisions.Select(d => d.Id + ":" + d.OrchestratorActionType))); + IReadOnlyList decisions = workItem.Cursor.LatestDecisions.ToList(); - foreach (OrchestratorAction decision in decisions) - { TraceHelper.TraceInstance( TraceEventType.Information, - "TaskOrchestrationDispatcher-ProcessOrchestratorAction", + "TaskOrchestrationDispatcher-ExecuteUserOrchestration-End", runtimeState.OrchestrationInstance, - "Processing orchestrator action of type {0}", - decision.OrchestratorActionType); - switch (decision.OrchestratorActionType) - { - case OrchestratorActionType.ScheduleOrchestrator: - messagesToSend.Add( - ProcessScheduleTaskDecision((ScheduleTaskOrchestratorAction)decision, runtimeState, - IncludeParameters)); - break; - case OrchestratorActionType.CreateTimer: - var timerOrchestratorAction = (CreateTimerOrchestratorAction)decision; - timerMessages.Add(ProcessCreateTimerDecision(timerOrchestratorAction, runtimeState)); - break; - case OrchestratorActionType.CreateSubOrchestration: - var createSubOrchestrationAction = (CreateSubOrchestrationAction)decision; - subOrchestrationMessages.Add( - ProcessCreateSubOrchestrationInstanceDecision(createSubOrchestrationAction, - runtimeState, IncludeParameters)); - break; - case OrchestratorActionType.OrchestrationComplete: - TaskMessage workflowInstanceCompletedMessage = - ProcessWorkflowCompletedTaskDecision((OrchestrationCompleteOrchestratorAction)decision, runtimeState, IncludeDetails, out continuedAsNew); - if (workflowInstanceCompletedMessage != null) - { - // Send complete message to parent workflow or to itself to start a new execution - // Store the event so we can rebuild the state - if (continuedAsNew) - { - continuedAsNewMessage = workflowInstanceCompletedMessage; - continueAsNewExecutionStarted = workflowInstanceCompletedMessage.Event as ExecutionStartedEvent; - } - else - { - subOrchestrationMessages.Add(workflowInstanceCompletedMessage); - } - } + "Executed user orchestration. Received {0} orchestrator actions: {1}", + decisions.Count, + string.Join(", ", decisions.Select(d => d.Id + ":" + d.OrchestratorActionType))); - isCompleted = !continuedAsNew; - break; - default: - throw TraceHelper.TraceExceptionInstance( - TraceEventType.Error, - "TaskOrchestrationDispatcher-UnsupportedDecisionType", - runtimeState.OrchestrationInstance, - new NotSupportedException("decision type not supported")); - } - - // Underlying orchestration service provider may have a limit of messages per call, to avoid the situation - // we keep on asking the provider if message count is ok and stop processing new decisions if not. - // - // We also put in a fake timer to force next orchestration task for remaining messages - int totalMessages = messagesToSend.Count + subOrchestrationMessages.Count + timerMessages.Count; - if (this.orchestrationService.IsMaxMessageCountExceeded(totalMessages, runtimeState)) + foreach (OrchestratorAction decision in decisions) { TraceHelper.TraceInstance( TraceEventType.Information, - "TaskOrchestrationDispatcher-MaxMessageCountReached", + "TaskOrchestrationDispatcher-ProcessOrchestratorAction", runtimeState.OrchestrationInstance, - "MaxMessageCount reached. Adding timer to process remaining events in next attempt."); + "Processing orchestrator action of type {0}", + decision.OrchestratorActionType); + switch (decision.OrchestratorActionType) + { + case OrchestratorActionType.ScheduleOrchestrator: + messagesToSend.Add( + ProcessScheduleTaskDecision((ScheduleTaskOrchestratorAction)decision, runtimeState, + IncludeParameters)); + break; + case OrchestratorActionType.CreateTimer: + var timerOrchestratorAction = (CreateTimerOrchestratorAction)decision; + timerMessages.Add(ProcessCreateTimerDecision(timerOrchestratorAction, runtimeState)); + break; + case OrchestratorActionType.CreateSubOrchestration: + var createSubOrchestrationAction = (CreateSubOrchestrationAction)decision; + subOrchestrationMessages.Add( + ProcessCreateSubOrchestrationInstanceDecision(createSubOrchestrationAction, + runtimeState, IncludeParameters)); + break; + case OrchestratorActionType.OrchestrationComplete: + OrchestrationCompleteOrchestratorAction completeDecision = (OrchestrationCompleteOrchestratorAction)decision; + TaskMessage workflowInstanceCompletedMessage = + ProcessWorkflowCompletedTaskDecision(completeDecision, runtimeState, IncludeDetails, out continuedAsNew); + if (workflowInstanceCompletedMessage != null) + { + // Send complete message to parent workflow or to itself to start a new execution + // Store the event so we can rebuild the state + carryOverEvents = null; + if (continuedAsNew) + { + continuedAsNewMessage = workflowInstanceCompletedMessage; + continueAsNewExecutionStarted = workflowInstanceCompletedMessage.Event as ExecutionStartedEvent; + if (completeDecision.CarryOverEvents.Any()) + { + carryOverEvents = completeDecision.CarryOverEvents.ToList(); + completeDecision.CarryOverEvents.Clear(); + } + } + else + { + subOrchestrationMessages.Add(workflowInstanceCompletedMessage); + } + } - if (isCompleted || continuedAsNew) + isCompleted = !continuedAsNew; + break; + default: + throw TraceHelper.TraceExceptionInstance( + TraceEventType.Error, + "TaskOrchestrationDispatcher-UnsupportedDecisionType", + runtimeState.OrchestrationInstance, + new NotSupportedException("decision type not supported")); + } + + // Underlying orchestration service provider may have a limit of messages per call, to avoid the situation + // we keep on asking the provider if message count is ok and stop processing new decisions if not. + // + // We also put in a fake timer to force next orchestration task for remaining messages + int totalMessages = messagesToSend.Count + subOrchestrationMessages.Count + timerMessages.Count; + if (this.orchestrationService.IsMaxMessageCountExceeded(totalMessages, runtimeState)) { TraceHelper.TraceInstance( TraceEventType.Information, - "TaskOrchestrationDispatcher-OrchestrationAlreadyCompleted", + "TaskOrchestrationDispatcher-MaxMessageCountReached", runtimeState.OrchestrationInstance, - "Orchestration already completed. Skip adding timer for splitting messages."); - break; - } + "MaxMessageCount reached. Adding timer to process remaining events in next attempt."); - var dummyTimer = new CreateTimerOrchestratorAction - { - Id = FrameworkConstants.FakeTimerIdToSplitDecision, - FireAt = DateTime.UtcNow - }; + if (isCompleted || continuedAsNew) + { + TraceHelper.TraceInstance( + TraceEventType.Information, + "TaskOrchestrationDispatcher-OrchestrationAlreadyCompleted", + runtimeState.OrchestrationInstance, + "Orchestration already completed. Skip adding timer for splitting messages."); + break; + } - timerMessages.Add(ProcessCreateTimerDecision(dummyTimer, runtimeState)); - isInterrupted = true; - break; + var dummyTimer = new CreateTimerOrchestratorAction + { + Id = FrameworkConstants.FakeTimerIdToSplitDecision, + FireAt = DateTime.UtcNow + }; + + timerMessages.Add(ProcessCreateTimerDecision(dummyTimer, runtimeState)); + isInterrupted = true; + break; + } } - } - } - // finish up processing of the work item - if (!continuedAsNew) - { - runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); - } + // finish up processing of the work item + if (!continuedAsNew) + { + runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); + } - OrchestrationRuntimeState newOrchestrationRuntimeState = workItem.OrchestrationRuntimeState; + if (isCompleted) + { + TraceHelper.TraceSession(TraceEventType.Information, "TaskOrchestrationDispatcher-DeletingSessionState", workItem.InstanceId, "Deleting session state"); + if (runtimeState.ExecutionStartedEvent != null) + { + instanceState = Utils.BuildOrchestrationState(runtimeState); + } + } + else + { - OrchestrationState instanceState = null; + if (continuedAsNew) + { + TraceHelper.TraceSession( + TraceEventType.Information, + "TaskOrchestrationDispatcher-UpdatingStateForContinuation", + workItem.InstanceId, + "Updating state for continuation"); + runtimeState = new OrchestrationRuntimeState(); + runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); + runtimeState.AddEvent(continueAsNewExecutionStarted); + if (carryOverEvents != null) + { + foreach (var historyEvent in carryOverEvents) + { + runtimeState.AddEvent(historyEvent); + } + } + runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); + workItem.OrchestrationRuntimeState = runtimeState; - if (isCompleted) - { - TraceHelper.TraceSession(TraceEventType.Information, "TaskOrchestrationDispatcher-DeletingSessionState", workItem.InstanceId, "Deleting session state"); - if (newOrchestrationRuntimeState.ExecutionStartedEvent != null) - { - instanceState = Utils.BuildOrchestrationState(newOrchestrationRuntimeState); - } + TaskOrchestration orchestration = this.objectManager.GetObject(runtimeState.Name, continueAsNewExecutionStarted.Version); + + workItem.Cursor = new OrchestrationExecutionCursor(runtimeState, orchestration, new TaskOrchestrationExecutor(runtimeState, workItem.Cursor.TaskOrchestration, false), null); + await orchestrationService.RenewTaskOrchestrationWorkItemLockAsync(workItem); + } - newOrchestrationRuntimeState = null; + instanceState = Utils.BuildOrchestrationState(runtimeState); + } + }while (continuedAsNew); + } - else - { - instanceState = Utils.BuildOrchestrationState(newOrchestrationRuntimeState); - if (continuedAsNew) - { - TraceHelper.TraceSession( - TraceEventType.Information, - "TaskOrchestrationDispatcher-UpdatingStateForContinuation", - workItem.InstanceId, - "Updating state for continuation"); - newOrchestrationRuntimeState = new OrchestrationRuntimeState(); - newOrchestrationRuntimeState.AddEvent(new OrchestratorStartedEvent(-1)); - newOrchestrationRuntimeState.AddEvent(continueAsNewExecutionStarted); - newOrchestrationRuntimeState.AddEvent(new OrchestratorCompletedEvent(-1)); - } - } + workItem.OrchestrationRuntimeState = oldOrchestrationRuntimeState; await this.orchestrationService.CompleteTaskOrchestrationWorkItemAsync( workItem, - newOrchestrationRuntimeState, + runtimeState, continuedAsNew ? null : messagesToSend, subOrchestrationMessages, continuedAsNew ? null : timerMessages, continuedAsNewMessage, instanceState); - return isCompleted || continuedAsNew || isInterrupted; + workItem.OrchestrationRuntimeState = runtimeState; + + return isCompleted|| continuedAsNew || isInterrupted; } async Task ExecuteOrchestrationAsync(OrchestrationRuntimeState runtimeState) @@ -391,7 +422,7 @@ async Task ExecuteOrchestrationAsync(Orchestration dispatchContext.SetProperty(taskOrchestration); dispatchContext.SetProperty(runtimeState); - var executor = new TaskOrchestrationExecutor(runtimeState, taskOrchestration); + var executor = new TaskOrchestrationExecutor(runtimeState, taskOrchestration, false); IEnumerable decisions = null; await this.dispatchPipeline.RunAsync(dispatchContext, _ => diff --git a/src/DurableTask.Core/TaskOrchestrationExecutor.cs b/src/DurableTask.Core/TaskOrchestrationExecutor.cs index 5264c3a54..4db281bfe 100644 --- a/src/DurableTask.Core/TaskOrchestrationExecutor.cs +++ b/src/DurableTask.Core/TaskOrchestrationExecutor.cs @@ -28,15 +28,17 @@ internal class TaskOrchestrationExecutor readonly TaskScheduler decisionScheduler; readonly OrchestrationRuntimeState orchestrationRuntimeState; readonly TaskOrchestration taskOrchestration; + readonly bool skipCarryOverEvents; Task result; public TaskOrchestrationExecutor(OrchestrationRuntimeState orchestrationRuntimeState, - TaskOrchestration taskOrchestration) + TaskOrchestration taskOrchestration, bool skipCarryOverEvents) { this.decisionScheduler = new SynchronousTaskScheduler(); this.context = new TaskOrchestrationContext(orchestrationRuntimeState.OrchestrationInstance, this.decisionScheduler); this.orchestrationRuntimeState = orchestrationRuntimeState; this.taskOrchestration = taskOrchestration; + this.skipCarryOverEvents = skipCarryOverEvents; } public bool IsCompleted => this.result != null && (this.result.IsCompleted || this.result.IsFaulted); @@ -151,8 +153,15 @@ void ProcessEvent(HistoryEvent historyEvent) this.context.HandleTimerFiredEvent((TimerFiredEvent)historyEvent); break; case EventType.EventRaised: - var eventRaisedEvent = (EventRaisedEvent)historyEvent; - this.taskOrchestration.RaiseEvent(this.context, eventRaisedEvent.Name, eventRaisedEvent.Input); + if (this.skipCarryOverEvents||!this.context.HasContinueAsNew) + { + var eventRaisedEvent = (EventRaisedEvent)historyEvent; + this.taskOrchestration.RaiseEvent(this.context, eventRaisedEvent.Name, eventRaisedEvent.Input); + } + else + { + this.context.AddEventToNextIteration(historyEvent); + } break; } } diff --git a/src/DurableTask.Emulator/LocalOrchestrationService.cs b/src/DurableTask.Emulator/LocalOrchestrationService.cs index db1c6ef5b..d9481c5d6 100644 --- a/src/DurableTask.Emulator/LocalOrchestrationService.cs +++ b/src/DurableTask.Emulator/LocalOrchestrationService.cs @@ -503,6 +503,11 @@ public Task ReleaseTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem work /// public int TaskActivityDispatcherCount => 1; + /// + /// Will not carry over unexecuted raised events to the next iteration of an orchestration on ContinueAsNew + /// + public bool SkipEventsOnContinuation => false; + /// public int MaxConcurrentTaskActivityWorkItems => this.MaxConcurrentWorkItems; diff --git a/src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs b/src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs index 32a42dcd7..9c016a09e 100644 --- a/src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs +++ b/src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs @@ -455,6 +455,11 @@ public int GetDelayInSecondsAfterOnFetchException(Exception exception) /// public int MaxConcurrentTaskOrchestrationWorkItems => this.Settings.TaskOrchestrationDispatcherSettings.MaxConcurrentOrchestrations; + /// + /// Will not carry over unexecuted raised events to the next iteration of an orchestration on ContinueAsNew + /// + public bool SkipEventsOnContinuation => this.Settings.TaskOrchestrationDispatcherSettings.SkipEventsOnContinuation; + /// /// Wait for the next orchestration work item and return the orchestration work item /// @@ -739,7 +744,6 @@ public async Task CompleteTaskOrchestrationWorkItemAsync( if (this.InstanceStore != null) { List trackingMessages = await CreateTrackingMessagesAsync(runtimeState, sessionState.SequenceNumber); - TraceHelper.TraceInstance( TraceEventType.Information, "ServiceBusOrchestrationService-CompleteTaskOrchestrationWorkItem-TrackingMessages", @@ -753,6 +757,24 @@ public async Task CompleteTaskOrchestrationWorkItemAsync( this.ServiceStats.TrackingDispatcherStats.MessageBatchesSent.Increment(); this.ServiceStats.TrackingDispatcherStats.MessagesSent.Increment(trackingMessages.Count); } + + if (newOrchestrationRuntimeState != null && runtimeState != newOrchestrationRuntimeState) + { + trackingMessages = await CreateTrackingMessagesAsync(newOrchestrationRuntimeState, sessionState.SequenceNumber); + TraceHelper.TraceInstance( + TraceEventType.Information, + "ServiceBusOrchestrationService-CompleteTaskOrchestrationWorkItem-TrackingMessages", + newOrchestrationRuntimeState.OrchestrationInstance, + "Created {0} tracking messages", trackingMessages.Count); + + if (trackingMessages.Count > 0) + { + await this.trackingSender.SendBatchAsync(trackingMessages.Select(m => m.Message)); + LogSentMessages(session, "Tracking messages", trackingMessages); + this.ServiceStats.TrackingDispatcherStats.MessageBatchesSent.Increment(); + this.ServiceStats.TrackingDispatcherStats.MessagesSent.Increment(trackingMessages.Count); + } + } } } @@ -1510,7 +1532,7 @@ async Task TrySetSessionStateAsync( var isSessionSizeThresholdExceeded = false; - if (newOrchestrationRuntimeState == null) + if (newOrchestrationRuntimeState == null || newOrchestrationRuntimeState.OrchestrationStatus != OrchestrationStatus.Running) { await session.SetStateAsync(null); return true; diff --git a/test/DurableTask.ServiceBus.Tests/ServiceBusOrchestrationServiceTests.cs b/test/DurableTask.ServiceBus.Tests/ServiceBusOrchestrationServiceTests.cs index 8f97dcf40..db71920b4 100644 --- a/test/DurableTask.ServiceBus.Tests/ServiceBusOrchestrationServiceTests.cs +++ b/test/DurableTask.ServiceBus.Tests/ServiceBusOrchestrationServiceTests.cs @@ -20,6 +20,7 @@ namespace DurableTask.ServiceBus.Tests using DurableTask.Core.Exceptions; using DurableTask.Test.Orchestrations; using Microsoft.VisualStudio.TestTools.UnitTesting; + using Newtonsoft.Json.Linq; [TestClass] public class ServiceBusOrchestrationServiceTests @@ -70,6 +71,55 @@ await this.taskHub.AddTaskOrchestrations(typeof(SimplestGreetingsOrchestration)) "Orchestration Result is wrong!!!"); } + [TestMethod] + public async Task ActorOrchestrationTest() + { + var sbService = (ServiceBusOrchestrationService)this.taskHub.orchestrationService; + string name = NameVersionHelper.GetDefaultName(typeof(CounterOrchestration)); + string version = NameVersionHelper.GetDefaultVersion(typeof(CounterOrchestration)); + + await this.taskHub.AddTaskOrchestrations(typeof(CounterOrchestration)) + .StartAsync(); + + int initialValue = 0; + OrchestrationInstance id = await this.client.CreateOrchestrationInstanceAsync(typeof(CounterOrchestration), initialValue); + + // Need to wait for the instance to start before sending events to it. + // TODO: This requirement may not be ideal and should be revisited. + await TestHelpers.WaitForInstanceAsync(this.client, id, 10, waitForCompletion: false); + + OrchestrationInstance temp = new OrchestrationInstance() { InstanceId = id.InstanceId }; + // Perform some operations + await this.client.RaiseEventAsync(temp, "operation", "incr"); + + // TODO: Sleeping to avoid a race condition where multiple ContinueAsNew messages + // are processed by the same instance at the same time, resulting in a corrupt + // storage failure in DTFx. + await this.client.RaiseEventAsync(temp, "operation", "incr"); + await this.client.RaiseEventAsync(temp, "operation", "decr"); + await this.client.RaiseEventAsync(temp, "operation", "incr"); + await this.client.RaiseEventAsync(temp, "operation", "incr"); + await this.client.RaiseEventAsync(temp, "operation", "end"); + await Task.Delay(4000); + + // Make sure it's still running and didn't complete early (or fail). + var status = await client.GetOrchestrationStateAsync(id); + Assert.IsTrue( + status?.OrchestrationStatus == OrchestrationStatus.Running || + status?.OrchestrationStatus == OrchestrationStatus.ContinuedAsNew); + + // The end message will cause the actor to complete itself. + await this.client.RaiseEventAsync(temp, "operation", "end"); + + status = await client.WaitForOrchestrationAsync(temp, TimeSpan.FromSeconds(10)); + + Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus); + Assert.AreEqual(3, JToken.Parse(status?.Output)); + + // When using ContinueAsNew, the original input is discarded and replaced with the most recent state. + Assert.AreNotEqual(initialValue, JToken.Parse(status?.Input)); + } + [TestMethod] public async Task SimplestGreetingsJumpStartDelayTest() { diff --git a/test/DurableTask.Test.Orchestrations/SimpleOrchestrations.cs b/test/DurableTask.Test.Orchestrations/SimpleOrchestrations.cs index 0987754db..08f42ded8 100644 --- a/test/DurableTask.Test.Orchestrations/SimpleOrchestrations.cs +++ b/test/DurableTask.Test.Orchestrations/SimpleOrchestrations.cs @@ -213,4 +213,52 @@ protected override int Execute(TaskContext context, string input) return GenerationCount; } } + + public sealed class CounterOrchestration : TaskOrchestration + { + TaskCompletionSource waitForOperationHandle; + + public override async Task RunTask(OrchestrationContext context, int currentValue) + { + string operation = await this.WaitForOperation(); + + bool done = false; + switch (operation?.ToLowerInvariant()) + { + case "incr": + currentValue++; + break; + case "decr": + currentValue--; + break; + case "end": + done = true; + break; + } + + if (!done) + { + context.ContinueAsNew(currentValue); + } + + return currentValue; + + } + + async Task WaitForOperation() + { + this.waitForOperationHandle = new TaskCompletionSource(); + string operation = await this.waitForOperationHandle.Task; + this.waitForOperationHandle = null; + return operation; + } + + public override void OnEvent(OrchestrationContext context, string name, string input) + { + if (this.waitForOperationHandle != null && !this.waitForOperationHandle.Task.IsCompleted) + { + this.waitForOperationHandle.SetResult(input); + } + } + } } From ae1308216b7d3beaa30119f49d8961e37f4806be Mon Sep 17 00:00:00 2001 From: Kanodia Adarsh Date: Fri, 18 Jan 2019 19:03:16 -0800 Subject: [PATCH 2/8] make skip events property driven --- src/DurableTask.Core/TaskOrchestrationDispatcher.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index 5835d98e2..9b8dff334 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -379,7 +379,7 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work TaskOrchestration orchestration = this.objectManager.GetObject(runtimeState.Name, continueAsNewExecutionStarted.Version); - workItem.Cursor = new OrchestrationExecutionCursor(runtimeState, orchestration, new TaskOrchestrationExecutor(runtimeState, workItem.Cursor.TaskOrchestration, false), null); + workItem.Cursor = new OrchestrationExecutionCursor(runtimeState, orchestration, new TaskOrchestrationExecutor(runtimeState, workItem.Cursor.TaskOrchestration, orchestrationService.SkipEventsOnContinuation), null); await orchestrationService.RenewTaskOrchestrationWorkItemLockAsync(workItem); } @@ -422,7 +422,7 @@ async Task ExecuteOrchestrationAsync(Orchestration dispatchContext.SetProperty(taskOrchestration); dispatchContext.SetProperty(runtimeState); - var executor = new TaskOrchestrationExecutor(runtimeState, taskOrchestration, false); + var executor = new TaskOrchestrationExecutor(runtimeState, taskOrchestration, orchestrationService.SkipEventsOnContinuation); IEnumerable decisions = null; await this.dispatchPipeline.RunAsync(dispatchContext, _ => From 8a948cbd7add0715d7b0c9f0164e8f5f712e041b Mon Sep 17 00:00:00 2001 From: Adarsh Kanodia Date: Fri, 18 Jan 2019 22:16:26 -0800 Subject: [PATCH 3/8] Address Code Review and cleanup --- .../AzureStorageScenarioTests.cs | 22 +++++++------------ .../AzureStorageOrchestrationService.cs | 14 ++++++------ ...OrchestrationCompleteOrchestratorAction.cs | 9 ++++---- .../TaskOrchestrationContext.cs | 3 ++- .../TaskOrchestrationDispatcher.cs | 11 ++++++---- .../TaskOrchestrationExecutor.cs | 2 +- .../ServiceBusOrchestrationServiceTests.cs | 4 ---- 7 files changed, 30 insertions(+), 35 deletions(-) diff --git a/Test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs b/Test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs index 43ef48531..15aa3b8f3 100644 --- a/Test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs +++ b/Test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs @@ -173,7 +173,7 @@ public async Task PurgeInstanceHistoryForSingleInstanceWithoutLargeMessageBlobs( await host.StartAsync(); TestOrchestrationClient client = await host.StartOrchestrationAsync(typeof(Orchestrations.Factorial), 110, instanceId); await client.WaitForCompletionAsync(TimeSpan.FromSeconds(30)); - + List historyEvents = await client.GetOrchestrationHistoryAsync(instanceId); Assert.IsTrue(historyEvents.Count > 0); @@ -538,10 +538,6 @@ public async Task ActorOrchestration(bool enableExtendedSessions) // Perform some operations await client.RaiseEventAsync("operation", "incr"); - - // TODO: Sleeping to avoid a race condition where multiple ContinueAsNew messages - // are processed by the same instance at the same time, resulting in a corrupt - // storage failure in DTFx. await client.RaiseEventAsync("operation", "incr"); await client.RaiseEventAsync("operation", "incr"); await client.RaiseEventAsync("operation", "decr"); @@ -603,8 +599,6 @@ public async Task ActorOrchestrationDeleteAllLargeMessageBlobs(bool enableExtend int blobCount = await this.GetBlobCount("test-largemessages", instanceId); - // Ideally there would only be three blobs at the end of the test. - // TODO: https://github.com/Azure/azure-functions-durable-extension/issues/509 Assert.AreEqual(3, blobCount); await client.PurgeInstanceHistoryByTimePeriod( @@ -643,7 +637,7 @@ private async Task> ValidateCharacterCoun // Need to wait for the instance to start before sending events to it. // TODO: This requirement may not be ideal and should be revisited. - OrchestrationState orchestrationState = + OrchestrationState orchestrationState = await client.WaitForStartupAsync(TimeSpan.FromSeconds(10)); // Perform some operations @@ -741,7 +735,7 @@ public async Task RewindOrchestrationsFail() var client1 = await host.StartOrchestrationAsync( typeof(Orchestrations.FactorialOrchestratorFail), - input: 3, + input: 3, instanceId: singletonInstanceId1); var statusFail = await client1.WaitForCompletionAsync(TimeSpan.FromSeconds(30)); @@ -755,7 +749,7 @@ public async Task RewindOrchestrationsFail() input: "Catherine", instanceId: singletonInstanceId2); - await client1.RewindAsync("Rewind failed orchestration only"); + await client1.RewindAsync("Rewind failed orchestration only"); var statusRewind = await client1.WaitForCompletionAsync(TimeSpan.FromSeconds(30)); @@ -1285,7 +1279,7 @@ public async Task LargeBinaryByteMessagePayloads(bool enableExtendedSessions) string currentDirectory = Directory.GetCurrentDirectory(); string originalFilePath = Path.Combine(currentDirectory, originalFileName); byte[] readBytes = File.ReadAllBytes(originalFilePath); - + var client = await host.StartOrchestrationAsync(typeof(Orchestrations.EchoBytes), readBytes); var status = await client.WaitForCompletionAsync(TimeSpan.FromMinutes(1)); @@ -1945,7 +1939,7 @@ public override async Task RunTask(OrchestrationContext context, int curren } return currentValue; - + } async Task WaitForOperation() @@ -1959,7 +1953,7 @@ async Task WaitForOperation() public override void OnEvent(OrchestrationContext context, string name, string input) { Assert.AreEqual("operation", name, true, "Unknown signal recieved..."); - if (this.waitForOperationHandle != null && !this.waitForOperationHandle.Task.IsCompleted) + if (this.waitForOperationHandle != null) { this.waitForOperationHandle.SetResult(input); } @@ -2224,7 +2218,7 @@ protected override string Execute(TaskContext context, string input) } } - internal class HelloFailMultipleActivity : TaskActivity + internal class HelloFailMultipleActivity : TaskActivity { public static bool ShouldFail1 = true; public static bool ShouldFail2 = true; diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index dfd04e741..17643aebd 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -67,7 +67,7 @@ public sealed class AzureStorageOrchestrationService : readonly TableEntityConverter tableEntityConverter; readonly ResettableLazy taskHubCreator; - readonly BlobLeaseManager leaseManager; + readonly BlobLeaseManager leaseManager; readonly PartitionManager partitionManager; readonly OrchestrationSessionManager orchestrationSessionManager; readonly object hubCreationLock; @@ -363,9 +363,9 @@ public async Task CreateAsync(bool recreateInstanceStore) { if (recreateInstanceStore) { - await DeleteTrackingStore(); + await DeleteTrackingStore(); - this.taskHubCreator.Reset(); + this.taskHubCreator.Reset(); } await this.taskHubCreator.Value; @@ -824,7 +824,7 @@ public async Task CompleteTaskOrchestrationWorkItemAsync( } session.StartNewLogicalTraceScope(); - OrchestrationRuntimeState runtimeState = newOrchestrationRuntimeState??workItem.OrchestrationRuntimeState; + OrchestrationRuntimeState runtimeState = newOrchestrationRuntimeState ?? workItem.OrchestrationRuntimeState; string instanceId = workItem.InstanceId; string executionId = runtimeState.OrchestrationInstance.ExecutionId; @@ -1070,7 +1070,7 @@ public async Task CompleteTaskActivityWorkItemAsync(TaskActivityWorkItem workIte // The context does not exist - possibly because it was already removed. AnalyticsEventSource.Log.AssertFailure( this.storageAccountName, - this.settings.TaskHubName, + this.settings.TaskHubName, $"Could not find context for work item with ID = {workItem.Id}.", Utils.ExtensionVersion); return; @@ -1128,7 +1128,7 @@ public async Task AbandonTaskActivityWorkItemAsync(TaskActivityWorkItem workItem // The context does not exist - possibly because it was already removed. AnalyticsEventSource.Log.AssertFailure( this.storageAccountName, - this.settings.TaskHubName, + this.settings.TaskHubName, $"Could not find context for work item with ID = {workItem.Id}.", Utils.ExtensionVersion); return; @@ -1450,7 +1450,7 @@ public async Task WaitForOrchestrationAsync( while (!cancellationToken.IsCancellationRequested && timeout > TimeSpan.Zero) { OrchestrationState state = await this.GetOrchestrationStateAsync(instanceId, executionId); - if (state == null || + if (state == null || state.OrchestrationStatus == OrchestrationStatus.Running || state.OrchestrationStatus == OrchestrationStatus.Pending || state.OrchestrationStatus == OrchestrationStatus.ContinuedAsNew) diff --git a/src/DurableTask.Core/Command/OrchestrationCompleteOrchestratorAction.cs b/src/DurableTask.Core/Command/OrchestrationCompleteOrchestratorAction.cs index 0b32404e5..dd170b18e 100644 --- a/src/DurableTask.Core/Command/OrchestrationCompleteOrchestratorAction.cs +++ b/src/DurableTask.Core/Command/OrchestrationCompleteOrchestratorAction.cs @@ -11,11 +11,12 @@ // limitations under the License. // ---------------------------------------------------------------------------------- -using System.Collections.Generic; -using DurableTask.Core.History; - namespace DurableTask.Core.Command { + + using System.Collections.Generic; + using DurableTask.Core.History; + internal class OrchestrationCompleteOrchestratorAction : OrchestratorAction { public OrchestrationCompleteOrchestratorAction() @@ -32,6 +33,6 @@ public OrchestrationCompleteOrchestratorAction() public string NewVersion { get; set; } - public IList CarryOverEvents { get; set; } + public IList CarryOverEvents { get; } } } \ No newline at end of file diff --git a/src/DurableTask.Core/TaskOrchestrationContext.cs b/src/DurableTask.Core/TaskOrchestrationContext.cs index 792da6aa4..57faa273e 100644 --- a/src/DurableTask.Core/TaskOrchestrationContext.cs +++ b/src/DurableTask.Core/TaskOrchestrationContext.cs @@ -37,7 +37,8 @@ internal class TaskOrchestrationContext : OrchestrationContext public bool HasContinueAsNew => continueAsNew != null; - public void AddEventToNextIteration(HistoryEvent he){ + public void AddEventToNextIteration(HistoryEvent he) + { continueAsNew.CarryOverEvents.Add(he); } diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index 9b8dff334..fe61ffa87 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -364,9 +364,11 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work "TaskOrchestrationDispatcher-UpdatingStateForContinuation", workItem.InstanceId, "Updating state for continuation"); + runtimeState = new OrchestrationRuntimeState(); runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); runtimeState.AddEvent(continueAsNewExecutionStarted); + if (carryOverEvents != null) { foreach (var historyEvent in carryOverEvents) @@ -374,19 +376,20 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work runtimeState.AddEvent(historyEvent); } } + runtimeState.AddEvent(new OrchestratorCompletedEvent(-1)); workItem.OrchestrationRuntimeState = runtimeState; TaskOrchestration orchestration = this.objectManager.GetObject(runtimeState.Name, continueAsNewExecutionStarted.Version); - + workItem.Cursor = new OrchestrationExecutionCursor(runtimeState, orchestration, new TaskOrchestrationExecutor(runtimeState, workItem.Cursor.TaskOrchestration, orchestrationService.SkipEventsOnContinuation), null); await orchestrationService.RenewTaskOrchestrationWorkItemLockAsync(workItem); } instanceState = Utils.BuildOrchestrationState(runtimeState); } - }while (continuedAsNew); - + } while (continuedAsNew); + } workItem.OrchestrationRuntimeState = oldOrchestrationRuntimeState; @@ -402,7 +405,7 @@ await this.orchestrationService.CompleteTaskOrchestrationWorkItemAsync( workItem.OrchestrationRuntimeState = runtimeState; - return isCompleted|| continuedAsNew || isInterrupted; + return isCompleted || continuedAsNew || isInterrupted; } async Task ExecuteOrchestrationAsync(OrchestrationRuntimeState runtimeState) diff --git a/src/DurableTask.Core/TaskOrchestrationExecutor.cs b/src/DurableTask.Core/TaskOrchestrationExecutor.cs index 4db281bfe..e06a6a469 100644 --- a/src/DurableTask.Core/TaskOrchestrationExecutor.cs +++ b/src/DurableTask.Core/TaskOrchestrationExecutor.cs @@ -153,7 +153,7 @@ void ProcessEvent(HistoryEvent historyEvent) this.context.HandleTimerFiredEvent((TimerFiredEvent)historyEvent); break; case EventType.EventRaised: - if (this.skipCarryOverEvents||!this.context.HasContinueAsNew) + if (this.skipCarryOverEvents || !this.context.HasContinueAsNew) { var eventRaisedEvent = (EventRaisedEvent)historyEvent; this.taskOrchestration.RaiseEvent(this.context, eventRaisedEvent.Name, eventRaisedEvent.Input); diff --git a/test/DurableTask.ServiceBus.Tests/ServiceBusOrchestrationServiceTests.cs b/test/DurableTask.ServiceBus.Tests/ServiceBusOrchestrationServiceTests.cs index db71920b4..354e4651d 100644 --- a/test/DurableTask.ServiceBus.Tests/ServiceBusOrchestrationServiceTests.cs +++ b/test/DurableTask.ServiceBus.Tests/ServiceBusOrchestrationServiceTests.cs @@ -91,10 +91,6 @@ await this.taskHub.AddTaskOrchestrations(typeof(CounterOrchestration)) OrchestrationInstance temp = new OrchestrationInstance() { InstanceId = id.InstanceId }; // Perform some operations await this.client.RaiseEventAsync(temp, "operation", "incr"); - - // TODO: Sleeping to avoid a race condition where multiple ContinueAsNew messages - // are processed by the same instance at the same time, resulting in a corrupt - // storage failure in DTFx. await this.client.RaiseEventAsync(temp, "operation", "incr"); await this.client.RaiseEventAsync(temp, "operation", "decr"); await this.client.RaiseEventAsync(temp, "operation", "incr"); From fa9c275104382c15d96f8f41c08b1152c48e9ac5 Mon Sep 17 00:00:00 2001 From: Kanodia Adarsh Date: Tue, 22 Jan 2019 11:23:56 -0800 Subject: [PATCH 4/8] Commit old state in case of an instance store backed tracking store --- .../AzureStorageOrchestrationService.cs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 17643aebd..a1681852c 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -845,6 +845,12 @@ await this.CommitOutboundQueueMessages( try { session.ETag = await this.trackingStore.UpdateStateAsync(runtimeState, instanceId, executionId, session.ETag); + + if(this.trackingStore is InstanceStoreBackedTrackingStore && workItem.OrchestrationRuntimeState != runtimeState){ + + session.ETag = await this.trackingStore.UpdateStateAsync(workItem.OrchestrationRuntimeState, instanceId, workItem.OrchestrationRuntimeState.OrchestrationInstance.ExecutionId, session.ETag); + + } } catch (Exception e) { From 1715aa5de341ccedab1a404e955c01b70268f6e0 Mon Sep 17 00:00:00 2001 From: Kanodia Adarsh Date: Tue, 22 Jan 2019 11:32:44 -0800 Subject: [PATCH 5/8] nit: fixed Indent --- .../ServiceBusOrchestrationService.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs b/src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs index 9c016a09e..173c5cec1 100644 --- a/src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs +++ b/src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs @@ -762,10 +762,10 @@ public async Task CompleteTaskOrchestrationWorkItemAsync( { trackingMessages = await CreateTrackingMessagesAsync(newOrchestrationRuntimeState, sessionState.SequenceNumber); TraceHelper.TraceInstance( - TraceEventType.Information, - "ServiceBusOrchestrationService-CompleteTaskOrchestrationWorkItem-TrackingMessages", - newOrchestrationRuntimeState.OrchestrationInstance, - "Created {0} tracking messages", trackingMessages.Count); + TraceEventType.Information, + "ServiceBusOrchestrationService-CompleteTaskOrchestrationWorkItem-TrackingMessages", + newOrchestrationRuntimeState.OrchestrationInstance, + "Created {0} tracking messages", trackingMessages.Count); if (trackingMessages.Count > 0) { From 34e1d6c7b3b31cc71567026f62e63793e2627275 Mon Sep 17 00:00:00 2001 From: Kanodia Adarsh Date: Tue, 22 Jan 2019 12:25:05 -0800 Subject: [PATCH 6/8] move dual runtime state commit to the tracking store. --- .../AzureStorageOrchestrationService.cs | 8 +------- .../Tracking/AzureTableTrackingStore.cs | 1 + .../Tracking/ITrackingStore.cs | 3 ++- .../Tracking/InstanceStoreBackedTrackingStore.cs | 13 ++++++++++++- .../Tracking/TrackingStoreBase.cs | 2 +- 5 files changed, 17 insertions(+), 10 deletions(-) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index a1681852c..e6622b9c9 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -844,13 +844,7 @@ await this.CommitOutboundQueueMessages( // will result in a duplicate replay of the orchestration with no side-effects. try { - session.ETag = await this.trackingStore.UpdateStateAsync(runtimeState, instanceId, executionId, session.ETag); - - if(this.trackingStore is InstanceStoreBackedTrackingStore && workItem.OrchestrationRuntimeState != runtimeState){ - - session.ETag = await this.trackingStore.UpdateStateAsync(workItem.OrchestrationRuntimeState, instanceId, workItem.OrchestrationRuntimeState.OrchestrationInstance.ExecutionId, session.ETag); - - } + session.ETag = await this.trackingStore.UpdateStateAsync(runtimeState, workItem.OrchestrationRuntimeState, instanceId, executionId, session.ETag); } catch (Exception e) { diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index d9fe81ada..99b31c003 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -852,6 +852,7 @@ public override Task StartAsync() /// public override async Task UpdateStateAsync( OrchestrationRuntimeState runtimeState, + OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, string eTagValue) diff --git a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs index 25ad589a4..7f49177fb 100644 --- a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs @@ -65,10 +65,11 @@ interface ITrackingStore /// Update State in the Tracking store for a particular orchestration instance and execution base on the new runtime state /// /// The New RuntimeState + /// The RuntimeState for an olderExecution /// InstanceId for the Orchestration Update /// ExecutionId for the Orchestration Update /// The ETag value to use for safe updates - Task UpdateStateAsync(OrchestrationRuntimeState runtimeState, string instanceId, string executionId, string eTag); + Task UpdateStateAsync(OrchestrationRuntimeState runtimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, string eTag); /// /// Get The Orchestration State for the Latest or All Executions diff --git a/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs index 5fe9707ec..e2979231d 100644 --- a/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs @@ -128,7 +128,18 @@ public override Task StartAsync() } /// - public override async Task UpdateStateAsync(OrchestrationRuntimeState runtimeState, string instanceId, string executionId, string eTag) + public override async Task UpdateStateAsync(OrchestrationRuntimeState runtimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, string eTag) + { + if (runtimeState != oldRuntimeState) + { + eTag = await UpdateStateAsync(oldRuntimeState, instanceId, oldRuntimeState.OrchestrationInstance.ExecutionId, eTag); + } + + return await UpdateStateAsync(runtimeState, instanceId, executionId, eTag); + } + + /// + private async Task UpdateStateAsync(OrchestrationRuntimeState runtimeState, string instanceId, string executionId, string eTag) { int oldEventsCount = (runtimeState.Events.Count - runtimeState.NewEvents.Count); await instanceStore.WriteEntitiesAsync(runtimeState.NewEvents.Select((x, i) => diff --git a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs index 169ed093e..b38fe151a 100644 --- a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs +++ b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs @@ -97,6 +97,6 @@ public virtual Task UpdateStatusForRewindAsync(string instanceId) public abstract Task StartAsync(); /// - public abstract Task UpdateStateAsync(OrchestrationRuntimeState runtimeState, string instanceId, string executionId, string eTag); + public abstract Task UpdateStateAsync(OrchestrationRuntimeState runtimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, string eTag); } } From 33a540186d727fc7b3f092901eb0b5fb45d61c61 Mon Sep 17 00:00:00 2001 From: Kanodia Adarsh Date: Tue, 22 Jan 2019 16:19:32 -0800 Subject: [PATCH 7/8] Renames nits, change EventHandlingOnContinue as new to an enum. --- .../AzureStorageOrchestrationService.cs | 6 ++-- ...zureStorageOrchestrationServiceSettings.cs | 5 +-- .../Tracking/AzureTableTrackingStore.cs | 10 +++--- .../Tracking/ITrackingStore.cs | 4 +-- .../InstanceStoreBackedTrackingStore.cs | 6 ++-- .../Tracking/TrackingStoreBase.cs | 2 +- ...OrchestrationCompleteOrchestratorAction.cs | 6 ++-- .../EventHandlingOnContinueAsNew.cs | 31 +++++++++++++++++++ src/DurableTask.Core/IOrchestrationService.cs | 4 +-- .../TaskOrchestrationDispatcherSettings.cs | 6 ++-- .../TaskOrchestrationContext.cs | 2 +- .../TaskOrchestrationDispatcher.cs | 24 +++++++------- .../TaskOrchestrationExecutor.cs | 4 +-- .../LocalOrchestrationService.cs | 4 +-- .../ServiceBusOrchestrationService.cs | 4 +-- 15 files changed, 75 insertions(+), 43 deletions(-) create mode 100644 src/DurableTask.Core/EventHandlingOnContinueAsNew.cs diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index e6622b9c9..cae9ba240 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -275,11 +275,11 @@ public int MaxConcurrentTaskActivityWorkItems } /// - /// Will not carry over unexecuted raised events to the next iteration of an orchestration on ContinueAsNew + /// Should we carry over unexecuted raised events to the next iteration of an orchestration on ContinueAsNew /// - public bool SkipEventsOnContinuation + public BehaviorOnContinueAsNew EventBehaviourForContinueAsNew { - get { return this.settings.SkipEventsOnContinuation; } + get { return this.settings.EventBehaviourForContinueAsNew; } } // We always leave the dispatcher counts at one unless we can find a customer workload that requires more. diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs index 5967ddabe..f8eeb876c 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs @@ -14,6 +14,7 @@ namespace DurableTask.AzureStorage { using System; + using DurableTask.Core; using Microsoft.WindowsAzure.Storage.Queue; using Microsoft.WindowsAzure.Storage.Table; @@ -141,8 +142,8 @@ public class AzureStorageOrchestrationServiceSettings public StorageAccountDetails StorageAccountDetails { get; set; } /// - /// Will not carry over unexecuted raised events to the next iteration of an orchestration on ContinueAsNew + /// Should we carry over unexecuted raised events to the next iteration of an orchestration on ContinueAsNew /// - public bool SkipEventsOnContinuation { get; set; } = false; + public BehaviorOnContinueAsNew EventBehaviourForContinueAsNew { get; set; } = BehaviorOnContinueAsNew.Carryover; } } diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index 99b31c003..0e63649c7 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -851,17 +851,17 @@ public override Task StartAsync() /// public override async Task UpdateStateAsync( - OrchestrationRuntimeState runtimeState, + OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, string eTagValue) { int estimatedBytes = 0; - IList newEvents = runtimeState.NewEvents; - IList allEvents = runtimeState.Events; + IList newEvents = newRuntimeState.NewEvents; + IList allEvents = newRuntimeState.Events; - int episodeNumber = Utils.GetEpisodeNumber(runtimeState); + int episodeNumber = Utils.GetEpisodeNumber(newRuntimeState); var newEventListBuffer = new StringBuilder(4000); var historyEventBatch = new TableBatchOperation(); @@ -872,7 +872,7 @@ public override async Task UpdateStateAsync( { Properties = { - ["CustomStatus"] = new EntityProperty(runtimeState.Status), + ["CustomStatus"] = new EntityProperty(newRuntimeState.Status), ["ExecutionId"] = new EntityProperty(executionId), ["LastUpdatedTime"] = new EntityProperty(newEvents.Last().Timestamp), } diff --git a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs index 7f49177fb..f67f0fd9b 100644 --- a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs @@ -64,12 +64,12 @@ interface ITrackingStore /// /// Update State in the Tracking store for a particular orchestration instance and execution base on the new runtime state /// - /// The New RuntimeState + /// The New RuntimeState /// The RuntimeState for an olderExecution /// InstanceId for the Orchestration Update /// ExecutionId for the Orchestration Update /// The ETag value to use for safe updates - Task UpdateStateAsync(OrchestrationRuntimeState runtimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, string eTag); + Task UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, string eTag); /// /// Get The Orchestration State for the Latest or All Executions diff --git a/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs index e2979231d..2b16f7607 100644 --- a/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs @@ -128,14 +128,14 @@ public override Task StartAsync() } /// - public override async Task UpdateStateAsync(OrchestrationRuntimeState runtimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, string eTag) + public override async Task UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, string eTag) { - if (runtimeState != oldRuntimeState) + if (newRuntimeState != oldRuntimeState) { eTag = await UpdateStateAsync(oldRuntimeState, instanceId, oldRuntimeState.OrchestrationInstance.ExecutionId, eTag); } - return await UpdateStateAsync(runtimeState, instanceId, executionId, eTag); + return await UpdateStateAsync(newRuntimeState, instanceId, executionId, eTag); } /// diff --git a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs index b38fe151a..0b002c801 100644 --- a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs +++ b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs @@ -97,6 +97,6 @@ public virtual Task UpdateStatusForRewindAsync(string instanceId) public abstract Task StartAsync(); /// - public abstract Task UpdateStateAsync(OrchestrationRuntimeState runtimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, string eTag); + public abstract Task UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, string eTag); } } diff --git a/src/DurableTask.Core/Command/OrchestrationCompleteOrchestratorAction.cs b/src/DurableTask.Core/Command/OrchestrationCompleteOrchestratorAction.cs index dd170b18e..35767ef48 100644 --- a/src/DurableTask.Core/Command/OrchestrationCompleteOrchestratorAction.cs +++ b/src/DurableTask.Core/Command/OrchestrationCompleteOrchestratorAction.cs @@ -13,7 +13,6 @@ namespace DurableTask.Core.Command { - using System.Collections.Generic; using DurableTask.Core.History; @@ -21,8 +20,9 @@ internal class OrchestrationCompleteOrchestratorAction : OrchestratorAction { public OrchestrationCompleteOrchestratorAction() { - CarryOverEvents = new List(); + CarryoverEvents = new List(); } + public OrchestrationStatus OrchestrationStatus; public override OrchestratorActionType OrchestratorActionType => OrchestratorActionType.OrchestrationComplete; @@ -33,6 +33,6 @@ public OrchestrationCompleteOrchestratorAction() public string NewVersion { get; set; } - public IList CarryOverEvents { get; } + public IList CarryoverEvents { get; } } } \ No newline at end of file diff --git a/src/DurableTask.Core/EventHandlingOnContinueAsNew.cs b/src/DurableTask.Core/EventHandlingOnContinueAsNew.cs new file mode 100644 index 000000000..1d039fdf6 --- /dev/null +++ b/src/DurableTask.Core/EventHandlingOnContinueAsNew.cs @@ -0,0 +1,31 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +namespace DurableTask.Core +{ + /// + /// Specifies Behavior to be followed when dealing with unprocessed EventRaisedEvents when an orchestration continues as new + /// + public enum BehaviorOnContinueAsNew + { + /// + /// All pending EventRaisedEvents will be ignored + /// + Ignore, + + /// + /// + /// + Carryover, + } +} diff --git a/src/DurableTask.Core/IOrchestrationService.cs b/src/DurableTask.Core/IOrchestrationService.cs index ea0d5c644..dc4b61c70 100644 --- a/src/DurableTask.Core/IOrchestrationService.cs +++ b/src/DurableTask.Core/IOrchestrationService.cs @@ -98,9 +98,9 @@ public interface IOrchestrationService int MaxConcurrentTaskOrchestrationWorkItems { get; } /// - /// Will not carry over unexecuted raised events to the next iteration of an orchestration on ContinueAsNew + /// Should we carry over unexecuted raised events to the next iteration of an orchestration on ContinueAsNew /// - bool SkipEventsOnContinuation { get; } + BehaviorOnContinueAsNew EventBehaviourForContinueAsNew { get; } /// /// Wait for the next orchestration work item and return the orchestration work item diff --git a/src/DurableTask.Core/Settings/TaskOrchestrationDispatcherSettings.cs b/src/DurableTask.Core/Settings/TaskOrchestrationDispatcherSettings.cs index 533719c1f..1e9ab989d 100644 --- a/src/DurableTask.Core/Settings/TaskOrchestrationDispatcherSettings.cs +++ b/src/DurableTask.Core/Settings/TaskOrchestrationDispatcherSettings.cs @@ -28,7 +28,7 @@ public TaskOrchestrationDispatcherSettings() DispatcherCount = FrameworkConstants.OrchestrationDefaultDispatcherCount; MaxConcurrentOrchestrations = FrameworkConstants.OrchestrationDefaultMaxConcurrentItems; CompressOrchestrationState = false; - SkipEventsOnContinuation = false; + EventBehaviourForContinueAsNew = BehaviorOnContinueAsNew.Carryover; } /// @@ -57,9 +57,9 @@ public TaskOrchestrationDispatcherSettings() public bool CompressOrchestrationState { get; set; } /// - /// Will not carry over unexecuted raised events to the next iteration of an orchestration on ContinueAsNew + /// Should we carry over unexecuted raised events to the next iteration of an orchestration on ContinueAsNew /// - public bool SkipEventsOnContinuation { get; set; } + public BehaviorOnContinueAsNew EventBehaviourForContinueAsNew { get; set; } internal TaskOrchestrationDispatcherSettings Clone() { diff --git a/src/DurableTask.Core/TaskOrchestrationContext.cs b/src/DurableTask.Core/TaskOrchestrationContext.cs index 57faa273e..63679b38d 100644 --- a/src/DurableTask.Core/TaskOrchestrationContext.cs +++ b/src/DurableTask.Core/TaskOrchestrationContext.cs @@ -39,7 +39,7 @@ internal class TaskOrchestrationContext : OrchestrationContext public void AddEventToNextIteration(HistoryEvent he) { - continueAsNew.CarryOverEvents.Add(he); + continueAsNew.CarryoverEvents.Add(he); } public TaskOrchestrationContext(OrchestrationInstance orchestrationInstance, TaskScheduler taskScheduler) diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index fe61ffa87..588c33227 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -198,7 +198,7 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); - OrchestrationRuntimeState oldOrchestrationRuntimeState = runtimeState; + OrchestrationRuntimeState originalOrchestrationRuntimeState = runtimeState; OrchestrationState instanceState = null; @@ -220,11 +220,11 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work continuedAsNewMessage = null; TraceHelper.TraceInstance( - TraceEventType.Verbose, - "TaskOrchestrationDispatcher-ExecuteUserOrchestration-Begin", - runtimeState.OrchestrationInstance, - "Executing user orchestration: {0}", - DataConverter.Serialize(runtimeState.GetOrchestrationRuntimeStateDump(), true)); + TraceEventType.Verbose, + "TaskOrchestrationDispatcher-ExecuteUserOrchestration-Begin", + runtimeState.OrchestrationInstance, + "Executing user orchestration: {0}", + DataConverter.Serialize(runtimeState.GetOrchestrationRuntimeStateDump(), true)); if (workItem.Cursor == null) { @@ -283,10 +283,10 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work { continuedAsNewMessage = workflowInstanceCompletedMessage; continueAsNewExecutionStarted = workflowInstanceCompletedMessage.Event as ExecutionStartedEvent; - if (completeDecision.CarryOverEvents.Any()) + if (completeDecision.CarryoverEvents.Any()) { - carryOverEvents = completeDecision.CarryOverEvents.ToList(); - completeDecision.CarryOverEvents.Clear(); + carryOverEvents = completeDecision.CarryoverEvents.ToList(); + completeDecision.CarryoverEvents.Clear(); } } else @@ -382,7 +382,7 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work TaskOrchestration orchestration = this.objectManager.GetObject(runtimeState.Name, continueAsNewExecutionStarted.Version); - workItem.Cursor = new OrchestrationExecutionCursor(runtimeState, orchestration, new TaskOrchestrationExecutor(runtimeState, workItem.Cursor.TaskOrchestration, orchestrationService.SkipEventsOnContinuation), null); + workItem.Cursor = new OrchestrationExecutionCursor(runtimeState, orchestration, new TaskOrchestrationExecutor(runtimeState, workItem.Cursor.TaskOrchestration, orchestrationService.EventBehaviourForContinueAsNew), null); await orchestrationService.RenewTaskOrchestrationWorkItemLockAsync(workItem); } @@ -392,7 +392,7 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work } - workItem.OrchestrationRuntimeState = oldOrchestrationRuntimeState; + workItem.OrchestrationRuntimeState = originalOrchestrationRuntimeState; await this.orchestrationService.CompleteTaskOrchestrationWorkItemAsync( workItem, @@ -425,7 +425,7 @@ async Task ExecuteOrchestrationAsync(Orchestration dispatchContext.SetProperty(taskOrchestration); dispatchContext.SetProperty(runtimeState); - var executor = new TaskOrchestrationExecutor(runtimeState, taskOrchestration, orchestrationService.SkipEventsOnContinuation); + var executor = new TaskOrchestrationExecutor(runtimeState, taskOrchestration, orchestrationService.EventBehaviourForContinueAsNew); IEnumerable decisions = null; await this.dispatchPipeline.RunAsync(dispatchContext, _ => diff --git a/src/DurableTask.Core/TaskOrchestrationExecutor.cs b/src/DurableTask.Core/TaskOrchestrationExecutor.cs index e06a6a469..afd0459eb 100644 --- a/src/DurableTask.Core/TaskOrchestrationExecutor.cs +++ b/src/DurableTask.Core/TaskOrchestrationExecutor.cs @@ -32,13 +32,13 @@ internal class TaskOrchestrationExecutor Task result; public TaskOrchestrationExecutor(OrchestrationRuntimeState orchestrationRuntimeState, - TaskOrchestration taskOrchestration, bool skipCarryOverEvents) + TaskOrchestration taskOrchestration, BehaviorOnContinueAsNew eventBehaviourForContinueAsNew) { this.decisionScheduler = new SynchronousTaskScheduler(); this.context = new TaskOrchestrationContext(orchestrationRuntimeState.OrchestrationInstance, this.decisionScheduler); this.orchestrationRuntimeState = orchestrationRuntimeState; this.taskOrchestration = taskOrchestration; - this.skipCarryOverEvents = skipCarryOverEvents; + this.skipCarryOverEvents = eventBehaviourForContinueAsNew == BehaviorOnContinueAsNew.Ignore; } public bool IsCompleted => this.result != null && (this.result.IsCompleted || this.result.IsFaulted); diff --git a/src/DurableTask.Emulator/LocalOrchestrationService.cs b/src/DurableTask.Emulator/LocalOrchestrationService.cs index d9481c5d6..ea7d30e4b 100644 --- a/src/DurableTask.Emulator/LocalOrchestrationService.cs +++ b/src/DurableTask.Emulator/LocalOrchestrationService.cs @@ -504,9 +504,9 @@ public Task ReleaseTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem work public int TaskActivityDispatcherCount => 1; /// - /// Will not carry over unexecuted raised events to the next iteration of an orchestration on ContinueAsNew + /// Should we carry over unexecuted raised events to the next iteration of an orchestration on ContinueAsNew /// - public bool SkipEventsOnContinuation => false; + public BehaviorOnContinueAsNew EventBehaviourForContinueAsNew => BehaviorOnContinueAsNew.Carryover; /// public int MaxConcurrentTaskActivityWorkItems => this.MaxConcurrentWorkItems; diff --git a/src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs b/src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs index 173c5cec1..2c9fe0d2f 100644 --- a/src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs +++ b/src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs @@ -456,9 +456,9 @@ public int GetDelayInSecondsAfterOnFetchException(Exception exception) public int MaxConcurrentTaskOrchestrationWorkItems => this.Settings.TaskOrchestrationDispatcherSettings.MaxConcurrentOrchestrations; /// - /// Will not carry over unexecuted raised events to the next iteration of an orchestration on ContinueAsNew + /// Should we carry over unexecuted raised events to the next iteration of an orchestration on ContinueAsNew /// - public bool SkipEventsOnContinuation => this.Settings.TaskOrchestrationDispatcherSettings.SkipEventsOnContinuation; + public BehaviorOnContinueAsNew EventBehaviourForContinueAsNew => this.Settings.TaskOrchestrationDispatcherSettings.EventBehaviourForContinueAsNew; /// /// Wait for the next orchestration work item and return the orchestration work item From f0a38b8692149f41cced5b9758c30d1b9aa3e5f3 Mon Sep 17 00:00:00 2001 From: Kanodia Adarsh Date: Tue, 22 Jan 2019 16:30:32 -0800 Subject: [PATCH 8/8] Added comments --- .../Tracking/InstanceStoreBackedTrackingStore.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs index 2b16f7607..9d3235140 100644 --- a/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs @@ -130,6 +130,8 @@ public override Task StartAsync() /// public override async Task UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, string eTag) { + //In case there is a runtime state for an older execution/iteration as well that needs to be committed, commit it. + //This may be the case if a ContinueAsNew was executed on the orchestration if (newRuntimeState != oldRuntimeState) { eTag = await UpdateStateAsync(oldRuntimeState, instanceId, oldRuntimeState.OrchestrationInstance.ExecutionId, eTag);