Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Command reordering in Core #315

Merged
merged 3 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions src/Temporalio/Bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions src/Temporalio/Worker/TemporalWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ public TemporalWorker(IWorkerClient client, TemporalWorkerOptions options)
OnTaskStarting: options.OnTaskStarting,
OnTaskCompleted: options.OnTaskCompleted,
RuntimeMetricMeter: MetricMeter,
WorkerLevelFailureExceptionTypes: options.WorkflowFailureExceptionTypes,
DisableCompletionCommandReordering: options.DisableWorkflowCompletionCommandReordering));
WorkerLevelFailureExceptionTypes: options.WorkflowFailureExceptionTypes));
}
}

Expand Down
9 changes: 0 additions & 9 deletions src/Temporalio/Worker/TemporalWorkerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -297,15 +297,6 @@ public TemporalWorkerOptions()
internal Func<WorkflowInstanceDetails, IWorkflowInstance> WorkflowInstanceFactory { get; set; } =
DefaultWorkflowInstanceFactory;

/// <summary>
/// Gets or sets a value indicating whether the workflow completion command reordering will
/// apply.
/// </summary>
/// <remarks>
/// This is visible for testing only.
/// </remarks>
internal bool DisableWorkflowCompletionCommandReordering { get; set; }

/// <summary>
/// Add the given delegate with <see cref="ActivityAttribute" /> as an activity. This is
/// usually a method reference.
Expand Down
34 changes: 12 additions & 22 deletions src/Temporalio/Worker/WorkflowInstance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ internal class WorkflowInstance : TaskScheduler, IWorkflowInstance, IWorkflowCon
private readonly Action<WorkflowInstance> onTaskStarting;
private readonly Action<WorkflowInstance, Exception?> onTaskCompleted;
private readonly IReadOnlyCollection<Type>? workerLevelFailureExceptionTypes;
private readonly bool disableCompletionCommandReordering;
private readonly Handlers inProgressHandlers = new();
private WorkflowActivationCompletion? completion;
// Will be set to null after last use (i.e. when workflow actually started)
Expand Down Expand Up @@ -190,7 +189,6 @@ public WorkflowInstance(WorkflowInstanceDetails details)
Random = new(details.Start.RandomnessSeed);
TracingEventsEnabled = !details.DisableTracingEvents;
workerLevelFailureExceptionTypes = details.WorkerLevelFailureExceptionTypes;
disableCompletionCommandReordering = details.DisableCompletionCommandReordering;
}

/// <summary>
Expand Down Expand Up @@ -580,8 +578,8 @@ public WorkflowActivationCompletion Activate(WorkflowActivation act)
}
}

// Maybe apply workflow completion command reordering logic
ApplyCompletionCommandReordering(act, completion, out var workflowComplete);
// Maybe apply legacy workflow completion command reordering logic
ApplyLegacyCompletionCommandReordering(act, completion, out var workflowComplete);

// Log warnings if we have completed
if (workflowComplete && !IsReplaying)
Expand Down Expand Up @@ -1425,20 +1423,11 @@ private string GetStackTrace()
}).Where(s => !string.IsNullOrEmpty(s)).Select(s => $"Task waiting at:\n{s}"));
}

private void ApplyCompletionCommandReordering(
private void ApplyLegacyCompletionCommandReordering(
WorkflowActivation act,
WorkflowActivationCompletion completion,
out bool workflowComplete)
{
// In earlier versions of the SDK we allowed commands to be sent after workflow
// completion. These ended up being removed effectively making the result of the
// workflow function mean any other later coroutine commands be ignored. To match
// Go/Java, we are now going to move workflow completion to the end so that
// same-task-post-completion commands are still accounted for.
//
// Note this only applies for successful activations that don't have completion
// reordering disabled and that are either not replaying or have the flag set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be worth leaving a comment for any future readers explaining that the code below assumes there is only one completion command. Otherwise having "the completion" in comments could confuse readers, making them forget that there could be multiple.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can change the variable name to lastCompletionCommandIndex if that helps

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure yes, and ideally "the last completion" in comments rather than "the completion" but, that's prefixed by "nit"!

Copy link
Member Author

@cretz cretz Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated var name, yeah "the completion in question" is hopefully implied now

// Find the index of the completion command
var completionCommandIndex = -1;
if (completion.Successful != null)
Expand All @@ -1459,21 +1448,22 @@ private void ApplyCompletionCommandReordering(
}
workflowComplete = completionCommandIndex >= 0;

// This only applies for successful activations that have a completion not at the end,
// don't have completion reordering disabled, and that are either not replaying or have
// the flag set.
// In a previous version of .NET, if this was a successful activation completion with
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: (there might be a few other places in the repo)

Suggested change
// In a previous version of .NET, if this was a successful activation completion with
// In a previous version of .NET SDK, if this was a successful activation completion with

Copy link
Member Author

@cretz cretz Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit pedantic for a code comment, but I understand the ambiguity to the reader. Will change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a reviewer prefixes their comment with "nit", you can't complain it's pedantic :) Universal law of code review!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very true, I definitely missed that prefacing. My bad (and fixed anyways).

// a completion command not at the end, we'd reorder it to move at the end. However,
// this logic has now moved to core and become more robust. Therefore, we only apply
// this logic if we're replaying and flag is present so that workflows/histories that
// were created after this .NET flag but before the core flag still work.
if (completion.Successful == null ||
completionCommandIndex == -1 ||
completionCommandIndex == completion.Successful.Commands.Count - 1 ||
disableCompletionCommandReordering ||
(IsReplaying && !act.AvailableInternalFlags.Contains(
(uint)WorkflowLogicFlag.ReorderWorkflowCompletion)))
!IsReplaying ||
!act.AvailableInternalFlags.Contains((uint)WorkflowLogicFlag.ReorderWorkflowCompletion))
{
return;
}

// Now we know the completion is in the wrong spot and we're on a newer SDK, so set the
// SDK flag and move it
// Now we know that we're replaying w/ the flag set and the completion in the wrong
// spot, so set the SDK flag and move it
completion.Successful.UsedInternalFlags.Add((uint)WorkflowLogicFlag.ReorderWorkflowCompletion);
var compCmd = completion.Successful.Commands[completionCommandIndex];
completion.Successful.Commands.RemoveAt(completionCommandIndex);
Copy link
Contributor

@dandavison dandavison Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So just to try to recap this logic for myself and anyone else. It would require a fairly complicated diagram to set it all out, but here's one story:

  • This release of .NET will also be the first .NET release that has the new core command reordering code.

  • So, for example, let C be a completion command and N be a non-completion, and consider a workflow being processed today by a .NET worker on the recently released code. Suppose the raw lang sequence is N C1 N C2 N. The .NET flag is set, so .NET will change that to N C1 N N C2. And core will change that to N C1, since it's old core.

  • Now, if we release .NET workers based on this PR, the same thing will happen on replay, because the .NET flag will be set, and the new core internal flag will not be set, so both .NET and core will do their legacy behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now consider a workflow born after this PR is released. Again the raw commands collected in lang are N C1 N C2 N. lang does not do anything to them, since lang's flag is not set, and core receives them unchanged. Core then does its new behavior, and changes them to N N N C1, and sets core's flag, so that this will also be done on replay.

Copy link
Member Author

@cretz cretz Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, everything in your statements is correct

Expand Down
6 changes: 1 addition & 5 deletions src/Temporalio/Worker/WorkflowInstanceDetails.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ namespace Temporalio.Worker
/// <param name="OnTaskCompleted">Callback for every instance task complete.</param>
/// <param name="RuntimeMetricMeter">Lazy runtime-level metric meter.</param>
/// <param name="WorkerLevelFailureExceptionTypes">Failure exception types at worker level.</param>
/// <param name="DisableCompletionCommandReordering">
/// Whether to disable completion command reordering.
/// </param>
internal record WorkflowInstanceDetails(
string Namespace,
string TaskQueue,
Expand All @@ -44,6 +41,5 @@ internal record WorkflowInstanceDetails(
Action<WorkflowInstance> OnTaskStarting,
Action<WorkflowInstance, Exception?> OnTaskCompleted,
Lazy<MetricMeter> RuntimeMetricMeter,
IReadOnlyCollection<Type>? WorkerLevelFailureExceptionTypes,
bool DisableCompletionCommandReordering);
IReadOnlyCollection<Type>? WorkerLevelFailureExceptionTypes);
}
3 changes: 1 addition & 2 deletions src/Temporalio/Worker/WorkflowReplayer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,7 @@ public WorkflowHistoryRunner(WorkflowReplayerOptions options, bool throwOnReplay
OnTaskStarting: options.OnTaskStarting,
OnTaskCompleted: options.OnTaskCompleted,
RuntimeMetricMeter: new(() => runtime.MetricMeter),
WorkerLevelFailureExceptionTypes: options.WorkflowFailureExceptionTypes,
DisableCompletionCommandReordering: options.DisableWorkflowCompletionCommandReordering),
WorkerLevelFailureExceptionTypes: options.WorkflowFailureExceptionTypes),
(runId, removeFromCache) => SetResult(removeFromCache));
}
catch
Expand Down
9 changes: 0 additions & 9 deletions src/Temporalio/Worker/WorkflowReplayerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,6 @@ public class WorkflowReplayerOptions : ICloneable
internal Func<WorkflowInstanceDetails, IWorkflowInstance> WorkflowInstanceFactory { get; set; } =
TemporalWorkerOptions.DefaultWorkflowInstanceFactory;

/// <summary>
/// Gets or sets a value indicating whether the workflow completion command reordering will
/// apply.
/// </summary>
/// <remarks>
/// This is visible for testing only.
/// </remarks>
internal bool DisableWorkflowCompletionCommandReordering { get; set; }

/// <summary>
/// Add the given type as a workflow.
/// </summary>
Expand Down
3 changes: 1 addition & 2 deletions src/Temporalio/Worker/WorkflowWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,7 @@ private IWorkflowInstance CreateInstance(WorkflowActivation act)
OnTaskStarting: options.OnTaskStarting,
OnTaskCompleted: options.OnTaskCompleted,
RuntimeMetricMeter: options.RuntimeMetricMeter,
WorkerLevelFailureExceptionTypes: options.WorkerLevelFailureExceptionTypes,
DisableCompletionCommandReordering: options.DisableCompletionCommandReordering));
WorkerLevelFailureExceptionTypes: options.WorkerLevelFailureExceptionTypes));
}
}
}
3 changes: 1 addition & 2 deletions src/Temporalio/Worker/WorkflowWorkerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,5 @@ internal record WorkflowWorkerOptions(
Action<WorkflowInstance> OnTaskStarting,
Action<WorkflowInstance, Exception?> OnTaskCompleted,
Lazy<MetricMeter> RuntimeMetricMeter,
IReadOnlyCollection<Type>? WorkerLevelFailureExceptionTypes,
bool DisableCompletionCommandReordering);
IReadOnlyCollection<Type>? WorkerLevelFailureExceptionTypes);
}
9 changes: 4 additions & 5 deletions tests/Temporalio.Tests/TestUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@ namespace Temporalio.Tests;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using Microsoft.Extensions.Logging;
using Temporalio.Client;
using Temporalio.Exceptions;
using Temporalio.Runtime;

public static class TestUtils
{
public static string CallerFilePath(
[System.Runtime.CompilerServices.CallerFilePath] string? callerPath = null)
{
return callerPath ?? throw new ArgumentException("Unable to find caller path");
}
public static string ReadAllFileText(
string relativePath, [CallerFilePath] string sourceFilePath = "") =>
File.ReadAllText(Path.Join(sourceFilePath, "..", relativePath));

public static int FreePort()
{
Expand Down
Loading