Skip to content

Commit

Permalink
Fix failing tests on the DurableTask.Emulator (#267)
Browse files Browse the repository at this point in the history
* Fixed Continue as New for the local orchestration service.
* Match the logic we are using in the ServiceBus orchestration service for setting the session state.
  • Loading branch information
adarsh1 authored and cgillum committed Mar 20, 2019
1 parent 4b29b7a commit f91ae79
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 36 deletions.
94 changes: 60 additions & 34 deletions src/DurableTask.Emulator/LocalOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
namespace DurableTask.Emulator
{
using DurableTask.Core;
using DurableTask.Core.Common;
using DurableTask.Core.Exceptions;
using DurableTask.Core.History;
using Newtonsoft.Json;
Expand Down Expand Up @@ -411,10 +412,22 @@ public Task CompleteTaskOrchestrationWorkItemAsync(
{
lock (this.thisLock)
{
byte[] newSessionState;

if (newOrchestrationRuntimeState == null ||
newOrchestrationRuntimeState.ExecutionStartedEvent == null ||
newOrchestrationRuntimeState.OrchestrationStatus != OrchestrationStatus.Running)
{
newSessionState = null;
}
else
{
newSessionState = SerializeOrchestrationRuntimeState(newOrchestrationRuntimeState);
}

this.orchestratorQueue.CompleteSession(
workItem.InstanceId,
newOrchestrationRuntimeState != null ?
SerializeOrchestrationRuntimeState(newOrchestrationRuntimeState) : null,
newSessionState,
orchestratorMessages,
continuedAsNewMessage
);
Expand All @@ -439,49 +452,62 @@ public Task CompleteTaskOrchestrationWorkItemAsync(
}
}

if (workItem.OrchestrationRuntimeState != newOrchestrationRuntimeState)
{
var oldState = Utils.BuildOrchestrationState(workItem.OrchestrationRuntimeState);
CommitState(workItem.OrchestrationRuntimeState, oldState).GetAwaiter().GetResult();
}

if (state != null)
{
if (!this.instanceStore.TryGetValue(workItem.InstanceId, out Dictionary<string, OrchestrationState> mapState))
{
mapState = new Dictionary<string, OrchestrationState>();
this.instanceStore[workItem.InstanceId] = mapState;
}
CommitState(newOrchestrationRuntimeState, state).GetAwaiter().GetResult();
}
}

mapState[workItem.OrchestrationRuntimeState.OrchestrationInstance.ExecutionId] = state;
return Task.FromResult(0);
}

// signal any waiters waiting on instanceid_executionid or just the latest instanceid_
Task CommitState(OrchestrationRuntimeState runtimeState, OrchestrationState state)
{
if (!this.instanceStore.TryGetValue(runtimeState.OrchestrationInstance.InstanceId, out Dictionary<string, OrchestrationState> mapState))
{
mapState = new Dictionary<string, OrchestrationState>();
this.instanceStore[runtimeState.OrchestrationInstance.InstanceId] = mapState;
}

if (state.OrchestrationStatus == OrchestrationStatus.Running
|| state.OrchestrationStatus == OrchestrationStatus.Pending)
{
return Task.FromResult(0);
}
mapState[runtimeState.OrchestrationInstance.ExecutionId] = state;

string key = workItem.OrchestrationRuntimeState.OrchestrationInstance.InstanceId + "_" +
workItem.OrchestrationRuntimeState.OrchestrationInstance.ExecutionId;
// signal any waiters waiting on instanceid_executionid or just the latest instanceid_

string key1 = workItem.OrchestrationRuntimeState.OrchestrationInstance.InstanceId + "_";
if (state.OrchestrationStatus == OrchestrationStatus.Running
|| state.OrchestrationStatus == OrchestrationStatus.Pending)
{
return Task.FromResult(0);
}

var tasks = new List<Task>();
string key = runtimeState.OrchestrationInstance.InstanceId + "_" +
runtimeState.OrchestrationInstance.ExecutionId;

if (this.orchestrationWaiters.TryGetValue(key, out TaskCompletionSource<OrchestrationState> tcs))
{
tasks.Add(Task.Run(() => tcs.TrySetResult(state)));
}
string key1 = runtimeState.OrchestrationInstance.InstanceId + "_";

// for instance id level waiters, we will not consider ContinueAsNew as a terminal state because
// the high level orchestration is still ongoing
if (state.OrchestrationStatus != OrchestrationStatus.ContinuedAsNew
&& this.orchestrationWaiters.TryGetValue(key1, out TaskCompletionSource<OrchestrationState> tcs1))
{
tasks.Add(Task.Run(() => tcs1.TrySetResult(state)));
}
var tasks = new List<Task>();

if (tasks.Count > 0)
{
Task.WaitAll(tasks.ToArray());
}
}
if (this.orchestrationWaiters.TryGetValue(key, out TaskCompletionSource<OrchestrationState> tcs))
{
tasks.Add(Task.Run(() => tcs.TrySetResult(state)));
}

// for instance id level waiters, we will not consider ContinueAsNew as a terminal state because
// the high level orchestration is still ongoing
if (state.OrchestrationStatus != OrchestrationStatus.ContinuedAsNew
&& this.orchestrationWaiters.TryGetValue(key1, out TaskCompletionSource<OrchestrationState> tcs1))
{
tasks.Add(Task.Run(() => tcs1.TrySetResult(state)));
}

if (tasks.Count > 0)
{
Task.WaitAll(tasks.ToArray());
}

return Task.FromResult(0);
Expand Down
2 changes: 0 additions & 2 deletions test/DurableTask.Emulator.Tests/EmulatorFunctionalTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ await worker.AddTaskOrchestrations(typeof(SimplestGreetingsOrchestration))
await worker.StopAsync(true);
}

[TestCategory("DisabledInCI")] // https://github.com/Azure/durabletask/issues/255
[TestMethod]
public async Task MockRecreateOrchestrationTest()
{
Expand Down Expand Up @@ -143,7 +142,6 @@ await worker.AddTaskOrchestrations(typeof(GreetingsRepeatWaitOrchestration))
await worker.StopAsync(true);
}

[TestCategory("DisabledInCI")] // https://github.com/Azure/durabletask/issues/255
[TestMethod]
public async Task MockGenerationTest()
{
Expand Down

0 comments on commit f91ae79

Please sign in to comment.