Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize Continue As New And Solve Missed Raised Events #251

Merged
merged 8 commits into from
Jan 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 8 additions & 18 deletions Test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public async Task PurgeInstanceHistoryForSingleInstanceWithoutLargeMessageBlobs(
await host.StartAsync();
TestOrchestrationClient client = await host.StartOrchestrationAsync(typeof(Orchestrations.Factorial), 110, instanceId);
await client.WaitForCompletionAsync(TimeSpan.FromSeconds(30));

List<HistoryStateEvent> historyEvents = await client.GetOrchestrationHistoryAsync(instanceId);
Assert.IsTrue(historyEvents.Count > 0);

Expand Down Expand Up @@ -538,17 +538,9 @@ public async Task ActorOrchestration(bool enableExtendedSessions)

// Perform some operations
await client.RaiseEventAsync("operation", "incr");

// TODO: Sleeping to avoid a race condition where multiple ContinueAsNew messages
// are processed by the same instance at the same time, resulting in a corrupt
// storage failure in DTFx.
await Task.Delay(2000);
await client.RaiseEventAsync("operation", "incr");
await Task.Delay(2000);
await client.RaiseEventAsync("operation", "incr");
await Task.Delay(2000);
await client.RaiseEventAsync("operation", "decr");
await Task.Delay(2000);
await client.RaiseEventAsync("operation", "incr");
await Task.Delay(2000);

Expand Down Expand Up @@ -607,9 +599,7 @@ public async Task ActorOrchestrationDeleteAllLargeMessageBlobs(bool enableExtend

int blobCount = await this.GetBlobCount("test-largemessages", instanceId);

// Ideally there would only be three blobs at the end of the test.
// TODO: https://github.com/Azure/azure-functions-durable-extension/issues/509
Assert.AreEqual(9, blobCount);
Assert.AreEqual(3, blobCount);

await client.PurgeInstanceHistoryByTimePeriod(
startDateTime,
Expand Down Expand Up @@ -647,7 +637,7 @@ private async Task<Tuple<string, TestOrchestrationClient>> ValidateCharacterCoun

// Need to wait for the instance to start before sending events to it.
// TODO: This requirement may not be ideal and should be revisited.
OrchestrationState orchestrationState =
OrchestrationState orchestrationState =
await client.WaitForStartupAsync(TimeSpan.FromSeconds(10));

// Perform some operations
Expand Down Expand Up @@ -745,7 +735,7 @@ public async Task RewindOrchestrationsFail()

var client1 = await host.StartOrchestrationAsync(
typeof(Orchestrations.FactorialOrchestratorFail),
input: 3,
input: 3,
instanceId: singletonInstanceId1);

var statusFail = await client1.WaitForCompletionAsync(TimeSpan.FromSeconds(30));
Expand All @@ -759,7 +749,7 @@ public async Task RewindOrchestrationsFail()
input: "Catherine",
instanceId: singletonInstanceId2);

await client1.RewindAsync("Rewind failed orchestration only");
await client1.RewindAsync("Rewind failed orchestration only");

var statusRewind = await client1.WaitForCompletionAsync(TimeSpan.FromSeconds(30));

Expand Down Expand Up @@ -1289,7 +1279,7 @@ public async Task LargeBinaryByteMessagePayloads(bool enableExtendedSessions)
string currentDirectory = Directory.GetCurrentDirectory();
string originalFilePath = Path.Combine(currentDirectory, originalFileName);
byte[] readBytes = File.ReadAllBytes(originalFilePath);

var client = await host.StartOrchestrationAsync(typeof(Orchestrations.EchoBytes), readBytes);
var status = await client.WaitForCompletionAsync(TimeSpan.FromMinutes(1));

Expand Down Expand Up @@ -1949,7 +1939,7 @@ public override async Task<int> RunTask(OrchestrationContext context, int curren
}

return currentValue;

}

async Task<string> WaitForOperation()
Expand Down Expand Up @@ -2228,7 +2218,7 @@ protected override string Execute(TaskContext context, string input)
}
}

internal class HelloFailMultipleActivity : TaskActivity<string, string>
internal class HelloFailMultipleActivity : TaskActivity<string, string>
{
public static bool ShouldFail1 = true;
public static bool ShouldFail2 = true;
Expand Down
24 changes: 16 additions & 8 deletions src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public sealed class AzureStorageOrchestrationService :
readonly TableEntityConverter tableEntityConverter;

readonly ResettableLazy<Task> taskHubCreator;
readonly BlobLeaseManager leaseManager;
readonly BlobLeaseManager leaseManager;
readonly PartitionManager<BlobLease> partitionManager;
readonly OrchestrationSessionManager orchestrationSessionManager;
readonly object hubCreationLock;
Expand Down Expand Up @@ -274,6 +274,14 @@ public int MaxConcurrentTaskActivityWorkItems
get { return this.settings.MaxConcurrentTaskActivityWorkItems; }
}

/// <summary>
/// Should we carry over unexecuted raised events to the next iteration of an orchestration on ContinueAsNew
/// </summary>
public BehaviorOnContinueAsNew EventBehaviourForContinueAsNew
{
get { return this.settings.EventBehaviourForContinueAsNew; }
}

// We always leave the dispatcher counts at one unless we can find a customer workload that requires more.
/// <inheritdoc />
public int TaskActivityDispatcherCount { get; } = 1;
Expand Down Expand Up @@ -355,9 +363,9 @@ public async Task CreateAsync(bool recreateInstanceStore)
{
if (recreateInstanceStore)
{
await DeleteTrackingStore();
await DeleteTrackingStore();

this.taskHubCreator.Reset();
this.taskHubCreator.Reset();
}

await this.taskHubCreator.Value;
Expand Down Expand Up @@ -816,7 +824,7 @@ public async Task CompleteTaskOrchestrationWorkItemAsync(
}

session.StartNewLogicalTraceScope();
OrchestrationRuntimeState runtimeState = workItem.OrchestrationRuntimeState;
OrchestrationRuntimeState runtimeState = newOrchestrationRuntimeState ?? workItem.OrchestrationRuntimeState;

string instanceId = workItem.InstanceId;
string executionId = runtimeState.OrchestrationInstance.ExecutionId;
Expand All @@ -836,7 +844,7 @@ await this.CommitOutboundQueueMessages(
// will result in a duplicate replay of the orchestration with no side-effects.
try
{
session.ETag = await this.trackingStore.UpdateStateAsync(runtimeState, instanceId, executionId, session.ETag);
session.ETag = await this.trackingStore.UpdateStateAsync(runtimeState, workItem.OrchestrationRuntimeState, instanceId, executionId, session.ETag);
}
catch (Exception e)
{
Expand Down Expand Up @@ -1062,7 +1070,7 @@ public async Task CompleteTaskActivityWorkItemAsync(TaskActivityWorkItem workIte
// The context does not exist - possibly because it was already removed.
AnalyticsEventSource.Log.AssertFailure(
this.storageAccountName,
this.settings.TaskHubName,
this.settings.TaskHubName,
$"Could not find context for work item with ID = {workItem.Id}.",
Utils.ExtensionVersion);
return;
Expand Down Expand Up @@ -1120,7 +1128,7 @@ public async Task AbandonTaskActivityWorkItemAsync(TaskActivityWorkItem workItem
// The context does not exist - possibly because it was already removed.
AnalyticsEventSource.Log.AssertFailure(
this.storageAccountName,
this.settings.TaskHubName,
this.settings.TaskHubName,
$"Could not find context for work item with ID = {workItem.Id}.",
Utils.ExtensionVersion);
return;
Expand Down Expand Up @@ -1442,7 +1450,7 @@ public async Task<OrchestrationState> WaitForOrchestrationAsync(
while (!cancellationToken.IsCancellationRequested && timeout > TimeSpan.Zero)
{
OrchestrationState state = await this.GetOrchestrationStateAsync(instanceId, executionId);
if (state == null ||
if (state == null ||
state.OrchestrationStatus == OrchestrationStatus.Running ||
state.OrchestrationStatus == OrchestrationStatus.Pending ||
state.OrchestrationStatus == OrchestrationStatus.ContinuedAsNew)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
namespace DurableTask.AzureStorage
{
using System;
using DurableTask.Core;
using Microsoft.WindowsAzure.Storage.Queue;
using Microsoft.WindowsAzure.Storage.Table;

Expand Down Expand Up @@ -139,5 +140,10 @@ public class AzureStorageOrchestrationServiceSettings
/// If provided, this is used to connect to Azure Storage
/// </summary>
public StorageAccountDetails StorageAccountDetails { get; set; }

/// <summary>
/// Should we carry over unexecuted raised events to the next iteration of an orchestration on ContinueAsNew
/// </summary>
public BehaviorOnContinueAsNew EventBehaviourForContinueAsNew { get; set; } = BehaviorOnContinueAsNew.Carryover;
}
}
11 changes: 6 additions & 5 deletions src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -851,16 +851,17 @@ public override Task StartAsync()

/// <inheritdoc />
public override async Task<string> UpdateStateAsync(
OrchestrationRuntimeState runtimeState,
OrchestrationRuntimeState newRuntimeState,
OrchestrationRuntimeState oldRuntimeState,
string instanceId,
string executionId,
string eTagValue)
{
int estimatedBytes = 0;
IList<HistoryEvent> newEvents = runtimeState.NewEvents;
IList<HistoryEvent> allEvents = runtimeState.Events;
IList<HistoryEvent> newEvents = newRuntimeState.NewEvents;
IList<HistoryEvent> allEvents = newRuntimeState.Events;

int episodeNumber = Utils.GetEpisodeNumber(runtimeState);
int episodeNumber = Utils.GetEpisodeNumber(newRuntimeState);

var newEventListBuffer = new StringBuilder(4000);
var historyEventBatch = new TableBatchOperation();
Expand All @@ -871,7 +872,7 @@ public override async Task<string> UpdateStateAsync(
{
Properties =
{
["CustomStatus"] = new EntityProperty(runtimeState.Status),
["CustomStatus"] = new EntityProperty(newRuntimeState.Status),
["ExecutionId"] = new EntityProperty(executionId),
["LastUpdatedTime"] = new EntityProperty(newEvents.Last().Timestamp),
}
Expand Down
5 changes: 3 additions & 2 deletions src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,12 @@ interface ITrackingStore
/// <summary>
/// Update State in the Tracking store for a particular orchestration instance and execution base on the new runtime state
/// </summary>
/// <param name="runtimeState">The New RuntimeState</param>
/// <param name="newRuntimeState">The New RuntimeState</param>
/// <param name="oldRuntimeState">The RuntimeState for an olderExecution</param>
/// <param name="instanceId">InstanceId for the Orchestration Update</param>
/// <param name="executionId">ExecutionId for the Orchestration Update</param>
/// <param name="eTag">The ETag value to use for safe updates</param>
Task<string> UpdateStateAsync(OrchestrationRuntimeState runtimeState, string instanceId, string executionId, string eTag);
Task<string> UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, string eTag);

/// <summary>
/// Get The Orchestration State for the Latest or All Executions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,20 @@ public override Task StartAsync()
}

/// <inheritdoc />
public override async Task<string> UpdateStateAsync(OrchestrationRuntimeState runtimeState, string instanceId, string executionId, string eTag)
public override async Task<string> UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, string eTag)
{
//In case there is a runtime state for an older execution/iteration as well that needs to be committed, commit it.
//This may be the case if a ContinueAsNew was executed on the orchestration
if (newRuntimeState != oldRuntimeState)
{
eTag = await UpdateStateAsync(oldRuntimeState, instanceId, oldRuntimeState.OrchestrationInstance.ExecutionId, eTag);
adarsh1 marked this conversation as resolved.
Show resolved Hide resolved
}

return await UpdateStateAsync(newRuntimeState, instanceId, executionId, eTag);
}

/// <inheritdoc />
private async Task<string> UpdateStateAsync(OrchestrationRuntimeState runtimeState, string instanceId, string executionId, string eTag)
{
int oldEventsCount = (runtimeState.Events.Count - runtimeState.NewEvents.Count);
await instanceStore.WriteEntitiesAsync(runtimeState.NewEvents.Select((x, i) =>
Expand Down
2 changes: 1 addition & 1 deletion src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,6 @@ public virtual Task UpdateStatusForRewindAsync(string instanceId)
public abstract Task StartAsync();

/// <inheritdoc />
public abstract Task<string> UpdateStateAsync(OrchestrationRuntimeState runtimeState, string instanceId, string executionId, string eTag);
public abstract Task<string> UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, string eTag);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,16 @@

namespace DurableTask.Core.Command
{
using System.Collections.Generic;
using DurableTask.Core.History;

internal class OrchestrationCompleteOrchestratorAction : OrchestratorAction
{
public OrchestrationCompleteOrchestratorAction()
{
CarryoverEvents = new List<HistoryEvent>();
}
adarsh1 marked this conversation as resolved.
Show resolved Hide resolved

public OrchestrationStatus OrchestrationStatus;

public override OrchestratorActionType OrchestratorActionType => OrchestratorActionType.OrchestrationComplete;
Expand All @@ -24,5 +32,7 @@ internal class OrchestrationCompleteOrchestratorAction : OrchestratorAction
public string Details { get; set; }

public string NewVersion { get; set; }

public IList<HistoryEvent> CarryoverEvents { get; }
}
}
31 changes: 31 additions & 0 deletions src/DurableTask.Core/EventHandlingOnContinueAsNew.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

namespace DurableTask.Core
{
/// <summary>
/// Specifies Behavior to be followed when dealing with unprocessed EventRaisedEvents when an orchestration continues as new
/// </summary>
public enum BehaviorOnContinueAsNew
{
/// <summary>
/// All pending EventRaisedEvents will be ignored
/// </summary>
Ignore,

/// <summary>
///
/// </summary>
Carryover,
}
}
5 changes: 5 additions & 0 deletions src/DurableTask.Core/IOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ public interface IOrchestrationService
/// </summary>
int MaxConcurrentTaskOrchestrationWorkItems { get; }

/// <summary>
/// Should we carry over unexecuted raised events to the next iteration of an orchestration on ContinueAsNew
/// </summary>
BehaviorOnContinueAsNew EventBehaviourForContinueAsNew { get; }

/// <summary>
/// Wait for the next orchestration work item and return the orchestration work item
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public TaskOrchestrationDispatcherSettings()
DispatcherCount = FrameworkConstants.OrchestrationDefaultDispatcherCount;
MaxConcurrentOrchestrations = FrameworkConstants.OrchestrationDefaultMaxConcurrentItems;
CompressOrchestrationState = false;
EventBehaviourForContinueAsNew = BehaviorOnContinueAsNew.Carryover;
}

/// <summary>
Expand Down Expand Up @@ -55,6 +56,11 @@ public TaskOrchestrationDispatcherSettings()
/// </summary>
public bool CompressOrchestrationState { get; set; }

/// <summary>
/// Should we carry over unexecuted raised events to the next iteration of an orchestration on ContinueAsNew
/// </summary>
public BehaviorOnContinueAsNew EventBehaviourForContinueAsNew { get; set; }

internal TaskOrchestrationDispatcherSettings Clone()
{
return new TaskOrchestrationDispatcherSettings
Expand Down
8 changes: 8 additions & 0 deletions src/DurableTask.Core/TaskOrchestrationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ internal class TaskOrchestrationContext : OrchestrationContext
bool executionTerminated;
int idCounter;

public bool HasContinueAsNew => continueAsNew != null;

public void AddEventToNextIteration(HistoryEvent he)
{
continueAsNew.CarryoverEvents.Add(he);
}

public TaskOrchestrationContext(OrchestrationInstance orchestrationInstance, TaskScheduler taskScheduler)
{
Utils.UnusedParameter(taskScheduler);
Expand All @@ -54,6 +61,7 @@ public TaskOrchestrationContext(OrchestrationInstance orchestrationInstance, Tas
internal void ClearPendingActions()
{
this.orchestratorActionsMap.Clear();
continueAsNew = null;
}

public override async Task<TResult> ScheduleTask<TResult>(string name, string version,
Expand Down
Loading