Skip to content

Commit

Permalink
Support for experimental workflow task events (#113)
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz authored Jul 24, 2023
1 parent 1c14789 commit dee4538
Show file tree
Hide file tree
Showing 13 changed files with 311 additions and 22 deletions.
4 changes: 3 additions & 1 deletion src/Temporalio/Worker/TemporalWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ public TemporalWorker(IWorkerClient client, TemporalWorkerOptions options)
WorkflowInstanceFactory: options.WorkflowInstanceFactory,
DebugMode: options.DebugMode,
DisableWorkflowTracingEventListener: options.DisableWorkflowTracingEventListener,
WorkflowStackTrace: options.WorkflowStackTrace));
WorkflowStackTrace: options.WorkflowStackTrace,
OnTaskStarting: options.OnTaskStarting,
OnTaskCompleted: options.OnTaskCompleted));
}
}

Expand Down
63 changes: 63 additions & 0 deletions src/Temporalio/Worker/TemporalWorkerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,44 @@ public TemporalWorkerOptions()
/// <param name="taskQueue">Task queue for the worker.</param>
public TemporalWorkerOptions(string taskQueue) => TaskQueue = taskQueue;

/// <summary>
/// Event for when a workflow task is starting. This should only be used for very advanced
/// scenarios.
/// </summary>
/// <remarks>
/// WARNING: This is experimental and may change in the future.
/// </remarks>
/// <remarks>
/// WARNING: As currently implemented, this does not currently represent workflow tasks as
/// Temporal server defines them. Rather this is SDK "activations" of which there may be
/// multiple in a single task if a user, for example, uses local activities. This may change
/// in the future.
/// </remarks>
/// <remarks>
/// WARNING: If a task fails (not a workflow failure, but a non-Temporal exception from a
/// task causing task failure), the task will continually retry causing new task events.
/// </remarks>
/// <remarks>
/// WARNING: In the case of a deadlock (i.e. task taking longer than 2 seconds), a
/// <see cref="WorkflowTaskCompleted" /> event may not occur.
/// </remarks>
/// <remarks>
/// WARNING: Adding/removing to/from this event or <see cref="WorkflowTaskCompleted" /> must
/// be done before constructing the worker. Since the worker clones these options on
/// construction, any alterations after construction will not apply.
/// </remarks>
public event EventHandler<WorkflowTaskStartingEventArgs>? WorkflowTaskStarting;

/// <summary>
/// Event for when a workflow task has completed but not yet sent back to the server. This
/// should only be used for very advanced scenarios.
/// </summary>
/// <remarks>
/// WARNING: This is experimental and there are many caveats about its use. It is important
/// to read the documentation on <see cref="WorkflowTaskStarting" />.
/// </remarks>
public event EventHandler<WorkflowTaskCompletedEventArgs>? WorkflowTaskCompleted;

/// <summary>
/// Gets or sets the task queue for the worker.
/// </summary>
Expand Down Expand Up @@ -294,5 +332,30 @@ public virtual object Clone()
options.workflows = new List<WorkflowDefinition>(Workflows);
return options;
}

/// <summary>
/// Callback for task starting.
/// </summary>
/// <param name="instance">Workflow instance.</param>
internal void OnTaskStarting(WorkflowInstance instance)
{
if (WorkflowTaskStarting is { } handler)
{
handler(instance, new(instance));
}
}

/// <summary>
/// Callback for task completed.
/// </summary>
/// <param name="instance">Workflow instance.</param>
/// <param name="failureException">Task failure exception.</param>
internal void OnTaskCompleted(WorkflowInstance instance, Exception? failureException)
{
if (WorkflowTaskCompleted is { } handler)
{
handler(instance, new(instance, failureException));
}
}
}
}
40 changes: 28 additions & 12 deletions src/Temporalio/Worker/WorkflowInstance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ namespace Temporalio.Worker
internal class WorkflowInstance : TaskScheduler, IWorkflowInstance, IWorkflowContext
{
private readonly TaskFactory taskFactory;
private readonly WorkflowDefinition defn;
private readonly IFailureConverter failureConverter;
private readonly Lazy<WorkflowInboundInterceptor> inbound;
private readonly Lazy<WorkflowOutboundInterceptor> outbound;
Expand Down Expand Up @@ -59,6 +58,8 @@ internal class WorkflowInstance : TaskScheduler, IWorkflowInstance, IWorkflowCon
private readonly LinkedList<System.Diagnostics.StackTrace>? pendingTaskStackTraces;
private readonly ILogger logger;
private readonly ReplaySafeLogger replaySafeLogger;
private readonly Action<WorkflowInstance> onTaskStarting;
private readonly Action<WorkflowInstance, Exception?> onTaskCompleted;
private WorkflowActivationCompletion? completion;
// Will be set to null after last use (i.e. when workflow actually started)
private Lazy<object?[]>? startArgs;
Expand All @@ -79,9 +80,9 @@ internal class WorkflowInstance : TaskScheduler, IWorkflowInstance, IWorkflowCon
public WorkflowInstance(WorkflowInstanceDetails details)
{
taskFactory = new(default, TaskCreationOptions.None, TaskContinuationOptions.ExecuteSynchronously, this);
defn = details.Definition;
dynamicQuery = defn.DynamicQuery;
dynamicSignal = defn.DynamicSignal;
Definition = details.Definition;
dynamicQuery = Definition.DynamicQuery;
dynamicSignal = Definition.DynamicSignal;
PayloadConverter = details.PayloadConverter;
failureConverter = details.FailureConverter;
var rootInbound = new InboundImpl(this);
Expand All @@ -104,8 +105,8 @@ public WorkflowInstance(WorkflowInstanceDetails details)
return rootInbound.Outbound!;
},
false);
mutableQueries = new(() => new(defn.Queries, OnQueryDefinitionAdded), false);
mutableSignals = new(() => new(defn.Signals, OnSignalDefinitionAdded), false);
mutableQueries = new(() => new(Definition.Queries, OnQueryDefinitionAdded), false);
mutableSignals = new(() => new(Definition.Signals, OnSignalDefinitionAdded), false);
var initialMemo = details.Start.Memo;
memo = new(
() => initialMemo == null ? new Dictionary<string, IRawValue>(0) :
Expand All @@ -122,10 +123,10 @@ public WorkflowInstance(WorkflowInstanceDetails details)
var start = details.Start;
startArgs = new(
() => DecodeArgs(
method: defn.RunMethod,
method: Definition.RunMethod,
payloads: start.Arguments,
itemName: $"Workflow {start.WorkflowType}",
dynamic: defn.Dynamic),
dynamic: Definition.Dynamic),
false);
initialSearchAttributes = details.Start.SearchAttributes;
WorkflowInfo.ParentInfo? parent = null;
Expand Down Expand Up @@ -157,6 +158,8 @@ public WorkflowInstance(WorkflowInstanceDetails details)
pendingTaskStackTraces = workflowStackTrace == WorkflowStackTrace.None ? null : new();
logger = details.LoggerFactory.CreateLogger($"Temporalio.Workflow:{start.WorkflowType}");
replaySafeLogger = new(logger);
onTaskStarting = details.OnTaskStarting;
onTaskCompleted = details.OnTaskCompleted;
Random = new(details.Start.RandomnessSeed);
TracingEventsEnabled = !details.DisableTracingEvents;
}
Expand Down Expand Up @@ -243,6 +246,11 @@ public WorkflowSignalDefinition? DynamicSignal
/// <inheritdoc />
public DateTime UtcNow { get; private set; }

/// <summary>
/// Gets the workflow definition.
/// </summary>
internal WorkflowDefinition Definition { get; private init; }

/// <summary>
/// Gets the instance, lazily creating if needed. This should never be called outside this
/// scheduler.
Expand All @@ -252,7 +260,7 @@ private object Instance
get
{
// We create this lazily because we want the constructor in a workflow context
instance ??= defn.CreateWorkflowInstance(startArgs!.Value);
instance ??= Definition.CreateWorkflowInstance(startArgs!.Value);
return instance;
}
}
Expand Down Expand Up @@ -446,7 +454,11 @@ public WorkflowActivationCompletion Activate(WorkflowActivation act)
IsReplaying = act.IsReplaying;
UtcNow = act.Timestamp.ToDateTime();

// Starting callback
onTaskStarting(this);

// Run the event loop until yielded for each job
Exception? failureException = null;
try
{
var previousContext = SynchronizationContext.Current;
Expand All @@ -470,6 +482,7 @@ public WorkflowActivationCompletion Activate(WorkflowActivation act)
}
catch (Exception e)
{
failureException = e;
logger.LogWarning(
e,
"Failed activation on workflow {WorkflowType} with ID {WorkflowId} and run ID {RunId}",
Expand Down Expand Up @@ -521,6 +534,9 @@ public WorkflowActivationCompletion Activate(WorkflowActivation act)
// Unset the completion
var toReturn = completion;
completion = null;

// Completed callback
onTaskCompleted(this, failureException);
return toReturn;
}
}
Expand Down Expand Up @@ -827,7 +843,7 @@ private void ApplyQueryWorkflow(QueryWorkflow query)
else
{
// Find definition or fail
var queries = mutableQueries.IsValueCreated ? mutableQueries.Value : defn.Queries;
var queries = mutableQueries.IsValueCreated ? mutableQueries.Value : Definition.Queries;
if (!queries.TryGetValue(query.QueryType, out queryDefn))
{
queryDefn = DynamicQuery;
Expand Down Expand Up @@ -939,7 +955,7 @@ private void ApplyResolveSignalExternalWorkflow(ResolveSignalExternalWorkflow re
private void ApplySignalWorkflow(SignalWorkflow signal)
{
// Find applicable definition or buffer
var signals = mutableSignals.IsValueCreated ? mutableSignals.Value : defn.Signals;
var signals = mutableSignals.IsValueCreated ? mutableSignals.Value : Definition.Signals;
if (!signals.TryGetValue(signal.SignalName, out var signalDefn))
{
signalDefn = DynamicSignal;
Expand Down Expand Up @@ -988,7 +1004,7 @@ private void ApplyStartWorkflow(StartWorkflow start)
{
var input = new ExecuteWorkflowInput(
Instance: Instance,
RunMethod: defn.RunMethod,
RunMethod: Definition.RunMethod,
Args: startArgs!.Value);
// We no longer need start args after this point, so we are unsetting them
startArgs = null;
Expand Down
10 changes: 6 additions & 4 deletions src/Temporalio/Worker/WorkflowInstanceDetails.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Collections.Generic;
using Microsoft.Extensions.Logging;
using Temporalio.Bridge.Api.WorkflowActivation;
Expand All @@ -20,9 +21,8 @@ namespace Temporalio.Worker
/// <param name="LoggerFactory">Logger factory.</param>
/// <param name="DisableTracingEvents">Whether tracing events are disabled.</param>
/// <param name="WorkflowStackTrace">Option for workflow stack trace.</param>
/// <remarks>
/// This is built to be easily serializable in case we do want a sandbox one day.
/// </remarks>
/// <param name="OnTaskStarting">Callback for every instance task start.</param>
/// <param name="OnTaskCompleted">Callback for every instance task complete.</param>
internal record WorkflowInstanceDetails(
string Namespace,
string TaskQueue,
Expand All @@ -34,5 +34,7 @@ internal record WorkflowInstanceDetails(
IFailureConverter FailureConverter,
ILoggerFactory LoggerFactory,
bool DisableTracingEvents,
WorkflowStackTrace WorkflowStackTrace);
WorkflowStackTrace WorkflowStackTrace,
Action<WorkflowInstance> OnTaskStarting,
Action<WorkflowInstance, Exception?> OnTaskCompleted);
}
4 changes: 3 additions & 1 deletion src/Temporalio/Worker/WorkflowReplayer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,9 @@ public WorkflowHistoryRunner(WorkflowReplayerOptions options, bool throwOnReplay
WorkflowInstanceFactory: options.WorkflowInstanceFactory,
DebugMode: options.DebugMode,
DisableWorkflowTracingEventListener: options.DisableWorkflowTracingEventListener,
WorkflowStackTrace: WorkflowStackTrace.None),
WorkflowStackTrace: WorkflowStackTrace.None,
OnTaskStarting: options.OnTaskStarting,
OnTaskCompleted: options.OnTaskCompleted),
(runId, removeFromCache) => SetResult(removeFromCache));
}
catch
Expand Down
45 changes: 45 additions & 0 deletions src/Temporalio/Worker/WorkflowReplayerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,26 @@ public class WorkflowReplayerOptions : ICloneable
{
private IList<WorkflowDefinition> workflows = new List<WorkflowDefinition>();

/// <summary>
/// Event for when a workflow task is starting. This should only be used for very advanced
/// scenarios.
/// </summary>
/// <remarks>
/// WARNING: This is experimental and there are many caveats about its use. It is important
/// to read the documentation on <see cref="TemporalWorkerOptions.WorkflowTaskStarting" />.
/// </remarks>
public event EventHandler<WorkflowTaskStartingEventArgs>? WorkflowTaskStarting;

/// <summary>
/// Event for when a workflow task has completed but not yet sent back to the server. This
/// should only be used for very advanced scenarios.
/// </summary>
/// <remarks>
/// WARNING: This is experimental and there are many caveats about its use. It is important
/// to read the documentation on <see cref="TemporalWorkerOptions.WorkflowTaskStarting" />.
/// </remarks>
public event EventHandler<WorkflowTaskCompletedEventArgs>? WorkflowTaskCompleted;

/// <summary>
/// Gets the workflow definitions.
/// </summary>
Expand Down Expand Up @@ -140,5 +160,30 @@ public virtual object Clone()
options.workflows = new List<WorkflowDefinition>(Workflows);
return options;
}

/// <summary>
/// Callback for task starting.
/// </summary>
/// <param name="instance">Workflow instance.</param>
internal void OnTaskStarting(WorkflowInstance instance)
{
if (WorkflowTaskStarting is { } handler)
{
handler(instance, new(instance));
}
}

/// <summary>
/// Callback for task completed.
/// </summary>
/// <param name="instance">Workflow instance.</param>
/// <param name="failureException">Task failure exception.</param>
internal void OnTaskCompleted(WorkflowInstance instance, Exception? failureException)
{
if (WorkflowTaskCompleted is { } handler)
{
handler(instance, new(instance, failureException));
}
}
}
}
33 changes: 33 additions & 0 deletions src/Temporalio/Worker/WorkflowTaskCompletedEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using System;

namespace Temporalio.Worker
{
/// <summary>
/// Event arguments for workflow task completed events.
/// </summary>
/// <remarks>
/// WARNING: This is experimental and there are many caveats about its use. It is important to
/// read the documentation on <see cref="TemporalWorkerOptions.WorkflowTaskStarting" />.
/// </remarks>
public class WorkflowTaskCompletedEventArgs : WorkflowTaskEventArgs
{
/// <summary>
/// Initializes a new instance of the <see cref="WorkflowTaskCompletedEventArgs"/> class.
/// </summary>
/// <param name="workflowInstance">Workflow instance.</param>
/// <param name="taskFailureException">Task failure exception.</param>
internal WorkflowTaskCompletedEventArgs(
WorkflowInstance workflowInstance, Exception? taskFailureException)
: base(workflowInstance) => TaskFailureException = taskFailureException;

/// <summary>
/// Gets the task failure if any.
/// </summary>
/// <remarks>
/// This is the task failure not the workflow failure. Task failures occur on all exceptions
/// except Temporal exceptions thrown from the workflow. These cause the workflow to
/// continually retry until code is fixed to solve the exception.
/// </remarks>
public Exception? TaskFailureException { get; private init; }
}
}
33 changes: 33 additions & 0 deletions src/Temporalio/Worker/WorkflowTaskEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using System;

namespace Temporalio.Worker
{
/// <summary>
/// Base class for workflow task event arguments.
/// </summary>
/// <remarks>
/// WARNING: This is experimental and there are many caveats about its use. It is important to
/// read the documentation on <see cref="TemporalWorkerOptions.WorkflowTaskStarting" />.
/// </remarks>
public abstract class WorkflowTaskEventArgs : EventArgs
{
private readonly WorkflowInstance workflowInstance;

/// <summary>
/// Initializes a new instance of the <see cref="WorkflowTaskEventArgs"/> class.
/// </summary>
/// <param name="workflowInstance">Workflow instance.</param>
internal WorkflowTaskEventArgs(WorkflowInstance workflowInstance) =>
this.workflowInstance = workflowInstance;

/// <summary>
/// Gets the workflow information.
/// </summary>
public Workflows.WorkflowInfo WorkflowInfo => workflowInstance.Info;

/// <summary>
/// Gets the workflow definition.
/// </summary>
public Workflows.WorkflowDefinition WorkflowDefinition => workflowInstance.Definition;
}
}
Loading

0 comments on commit dee4538

Please sign in to comment.