diff --git a/src/Temporalio/Worker/TemporalWorker.cs b/src/Temporalio/Worker/TemporalWorker.cs index edd07760..40966b42 100644 --- a/src/Temporalio/Worker/TemporalWorker.cs +++ b/src/Temporalio/Worker/TemporalWorker.cs @@ -81,7 +81,9 @@ public TemporalWorker(IWorkerClient client, TemporalWorkerOptions options) WorkflowInstanceFactory: options.WorkflowInstanceFactory, DebugMode: options.DebugMode, DisableWorkflowTracingEventListener: options.DisableWorkflowTracingEventListener, - WorkflowStackTrace: options.WorkflowStackTrace)); + WorkflowStackTrace: options.WorkflowStackTrace, + OnTaskStarting: options.OnTaskStarting, + OnTaskCompleted: options.OnTaskCompleted)); } } diff --git a/src/Temporalio/Worker/TemporalWorkerOptions.cs b/src/Temporalio/Worker/TemporalWorkerOptions.cs index 6ab02ea3..db42e2f0 100644 --- a/src/Temporalio/Worker/TemporalWorkerOptions.cs +++ b/src/Temporalio/Worker/TemporalWorkerOptions.cs @@ -36,6 +36,44 @@ public TemporalWorkerOptions() /// Task queue for the worker. public TemporalWorkerOptions(string taskQueue) => TaskQueue = taskQueue; + /// + /// Event for when a workflow task is starting. This should only be used for very advanced + /// scenarios. + /// + /// + /// WARNING: This is experimental and may change in the future. + /// + /// + /// WARNING: As currently implemented, this does not currently represent workflow tasks as + /// Temporal server defines them. Rather this is SDK "activations" of which there may be + /// multiple in a single task if a user, for example, uses local activities. This may change + /// in the future. + /// + /// + /// WARNING: If a task fails (not a workflow failure, but a non-Temporal exception from a + /// task causing task failure), the task will continually retry causing new task events. + /// + /// + /// WARNING: In the case of a deadlock (i.e. task taking longer than 2 seconds), a + /// event may not occur. + /// + /// + /// WARNING: Adding/removing to/from this event or must + /// be done before constructing the worker. Since the worker clones these options on + /// construction, any alterations after construction will not apply. + /// + public event EventHandler? WorkflowTaskStarting; + + /// + /// Event for when a workflow task has completed but not yet sent back to the server. This + /// should only be used for very advanced scenarios. + /// + /// + /// WARNING: This is experimental and there are many caveats about its use. It is important + /// to read the documentation on . + /// + public event EventHandler? WorkflowTaskCompleted; + /// /// Gets or sets the task queue for the worker. /// @@ -294,5 +332,30 @@ public virtual object Clone() options.workflows = new List(Workflows); return options; } + + /// + /// Callback for task starting. + /// + /// Workflow instance. + internal void OnTaskStarting(WorkflowInstance instance) + { + if (WorkflowTaskStarting is { } handler) + { + handler(instance, new(instance)); + } + } + + /// + /// Callback for task completed. + /// + /// Workflow instance. + /// Task failure exception. + internal void OnTaskCompleted(WorkflowInstance instance, Exception? failureException) + { + if (WorkflowTaskCompleted is { } handler) + { + handler(instance, new(instance, failureException)); + } + } } } \ No newline at end of file diff --git a/src/Temporalio/Worker/WorkflowInstance.cs b/src/Temporalio/Worker/WorkflowInstance.cs index b059b22e..d85c19ff 100644 --- a/src/Temporalio/Worker/WorkflowInstance.cs +++ b/src/Temporalio/Worker/WorkflowInstance.cs @@ -29,7 +29,6 @@ namespace Temporalio.Worker internal class WorkflowInstance : TaskScheduler, IWorkflowInstance, IWorkflowContext { private readonly TaskFactory taskFactory; - private readonly WorkflowDefinition defn; private readonly IFailureConverter failureConverter; private readonly Lazy inbound; private readonly Lazy outbound; @@ -59,6 +58,8 @@ internal class WorkflowInstance : TaskScheduler, IWorkflowInstance, IWorkflowCon private readonly LinkedList? pendingTaskStackTraces; private readonly ILogger logger; private readonly ReplaySafeLogger replaySafeLogger; + private readonly Action onTaskStarting; + private readonly Action onTaskCompleted; private WorkflowActivationCompletion? completion; // Will be set to null after last use (i.e. when workflow actually started) private Lazy? startArgs; @@ -79,9 +80,9 @@ internal class WorkflowInstance : TaskScheduler, IWorkflowInstance, IWorkflowCon public WorkflowInstance(WorkflowInstanceDetails details) { taskFactory = new(default, TaskCreationOptions.None, TaskContinuationOptions.ExecuteSynchronously, this); - defn = details.Definition; - dynamicQuery = defn.DynamicQuery; - dynamicSignal = defn.DynamicSignal; + Definition = details.Definition; + dynamicQuery = Definition.DynamicQuery; + dynamicSignal = Definition.DynamicSignal; PayloadConverter = details.PayloadConverter; failureConverter = details.FailureConverter; var rootInbound = new InboundImpl(this); @@ -104,8 +105,8 @@ public WorkflowInstance(WorkflowInstanceDetails details) return rootInbound.Outbound!; }, false); - mutableQueries = new(() => new(defn.Queries, OnQueryDefinitionAdded), false); - mutableSignals = new(() => new(defn.Signals, OnSignalDefinitionAdded), false); + mutableQueries = new(() => new(Definition.Queries, OnQueryDefinitionAdded), false); + mutableSignals = new(() => new(Definition.Signals, OnSignalDefinitionAdded), false); var initialMemo = details.Start.Memo; memo = new( () => initialMemo == null ? new Dictionary(0) : @@ -122,10 +123,10 @@ public WorkflowInstance(WorkflowInstanceDetails details) var start = details.Start; startArgs = new( () => DecodeArgs( - method: defn.RunMethod, + method: Definition.RunMethod, payloads: start.Arguments, itemName: $"Workflow {start.WorkflowType}", - dynamic: defn.Dynamic), + dynamic: Definition.Dynamic), false); initialSearchAttributes = details.Start.SearchAttributes; WorkflowInfo.ParentInfo? parent = null; @@ -157,6 +158,8 @@ public WorkflowInstance(WorkflowInstanceDetails details) pendingTaskStackTraces = workflowStackTrace == WorkflowStackTrace.None ? null : new(); logger = details.LoggerFactory.CreateLogger($"Temporalio.Workflow:{start.WorkflowType}"); replaySafeLogger = new(logger); + onTaskStarting = details.OnTaskStarting; + onTaskCompleted = details.OnTaskCompleted; Random = new(details.Start.RandomnessSeed); TracingEventsEnabled = !details.DisableTracingEvents; } @@ -243,6 +246,11 @@ public WorkflowSignalDefinition? DynamicSignal /// public DateTime UtcNow { get; private set; } + /// + /// Gets the workflow definition. + /// + internal WorkflowDefinition Definition { get; private init; } + /// /// Gets the instance, lazily creating if needed. This should never be called outside this /// scheduler. @@ -252,7 +260,7 @@ private object Instance get { // We create this lazily because we want the constructor in a workflow context - instance ??= defn.CreateWorkflowInstance(startArgs!.Value); + instance ??= Definition.CreateWorkflowInstance(startArgs!.Value); return instance; } } @@ -446,7 +454,11 @@ public WorkflowActivationCompletion Activate(WorkflowActivation act) IsReplaying = act.IsReplaying; UtcNow = act.Timestamp.ToDateTime(); + // Starting callback + onTaskStarting(this); + // Run the event loop until yielded for each job + Exception? failureException = null; try { var previousContext = SynchronizationContext.Current; @@ -470,6 +482,7 @@ public WorkflowActivationCompletion Activate(WorkflowActivation act) } catch (Exception e) { + failureException = e; logger.LogWarning( e, "Failed activation on workflow {WorkflowType} with ID {WorkflowId} and run ID {RunId}", @@ -521,6 +534,9 @@ public WorkflowActivationCompletion Activate(WorkflowActivation act) // Unset the completion var toReturn = completion; completion = null; + + // Completed callback + onTaskCompleted(this, failureException); return toReturn; } } @@ -827,7 +843,7 @@ private void ApplyQueryWorkflow(QueryWorkflow query) else { // Find definition or fail - var queries = mutableQueries.IsValueCreated ? mutableQueries.Value : defn.Queries; + var queries = mutableQueries.IsValueCreated ? mutableQueries.Value : Definition.Queries; if (!queries.TryGetValue(query.QueryType, out queryDefn)) { queryDefn = DynamicQuery; @@ -939,7 +955,7 @@ private void ApplyResolveSignalExternalWorkflow(ResolveSignalExternalWorkflow re private void ApplySignalWorkflow(SignalWorkflow signal) { // Find applicable definition or buffer - var signals = mutableSignals.IsValueCreated ? mutableSignals.Value : defn.Signals; + var signals = mutableSignals.IsValueCreated ? mutableSignals.Value : Definition.Signals; if (!signals.TryGetValue(signal.SignalName, out var signalDefn)) { signalDefn = DynamicSignal; @@ -988,7 +1004,7 @@ private void ApplyStartWorkflow(StartWorkflow start) { var input = new ExecuteWorkflowInput( Instance: Instance, - RunMethod: defn.RunMethod, + RunMethod: Definition.RunMethod, Args: startArgs!.Value); // We no longer need start args after this point, so we are unsetting them startArgs = null; diff --git a/src/Temporalio/Worker/WorkflowInstanceDetails.cs b/src/Temporalio/Worker/WorkflowInstanceDetails.cs index 223cf649..648b410d 100644 --- a/src/Temporalio/Worker/WorkflowInstanceDetails.cs +++ b/src/Temporalio/Worker/WorkflowInstanceDetails.cs @@ -1,3 +1,4 @@ +using System; using System.Collections.Generic; using Microsoft.Extensions.Logging; using Temporalio.Bridge.Api.WorkflowActivation; @@ -20,9 +21,8 @@ namespace Temporalio.Worker /// Logger factory. /// Whether tracing events are disabled. /// Option for workflow stack trace. - /// - /// This is built to be easily serializable in case we do want a sandbox one day. - /// + /// Callback for every instance task start. + /// Callback for every instance task complete. internal record WorkflowInstanceDetails( string Namespace, string TaskQueue, @@ -34,5 +34,7 @@ internal record WorkflowInstanceDetails( IFailureConverter FailureConverter, ILoggerFactory LoggerFactory, bool DisableTracingEvents, - WorkflowStackTrace WorkflowStackTrace); + WorkflowStackTrace WorkflowStackTrace, + Action OnTaskStarting, + Action OnTaskCompleted); } \ No newline at end of file diff --git a/src/Temporalio/Worker/WorkflowReplayer.cs b/src/Temporalio/Worker/WorkflowReplayer.cs index 07c4bbb7..096902a8 100644 --- a/src/Temporalio/Worker/WorkflowReplayer.cs +++ b/src/Temporalio/Worker/WorkflowReplayer.cs @@ -169,7 +169,9 @@ public WorkflowHistoryRunner(WorkflowReplayerOptions options, bool throwOnReplay WorkflowInstanceFactory: options.WorkflowInstanceFactory, DebugMode: options.DebugMode, DisableWorkflowTracingEventListener: options.DisableWorkflowTracingEventListener, - WorkflowStackTrace: WorkflowStackTrace.None), + WorkflowStackTrace: WorkflowStackTrace.None, + OnTaskStarting: options.OnTaskStarting, + OnTaskCompleted: options.OnTaskCompleted), (runId, removeFromCache) => SetResult(removeFromCache)); } catch diff --git a/src/Temporalio/Worker/WorkflowReplayerOptions.cs b/src/Temporalio/Worker/WorkflowReplayerOptions.cs index 7cb4940a..05ef9f15 100644 --- a/src/Temporalio/Worker/WorkflowReplayerOptions.cs +++ b/src/Temporalio/Worker/WorkflowReplayerOptions.cs @@ -14,6 +14,26 @@ public class WorkflowReplayerOptions : ICloneable { private IList workflows = new List(); + /// + /// Event for when a workflow task is starting. This should only be used for very advanced + /// scenarios. + /// + /// + /// WARNING: This is experimental and there are many caveats about its use. It is important + /// to read the documentation on . + /// + public event EventHandler? WorkflowTaskStarting; + + /// + /// Event for when a workflow task has completed but not yet sent back to the server. This + /// should only be used for very advanced scenarios. + /// + /// + /// WARNING: This is experimental and there are many caveats about its use. It is important + /// to read the documentation on . + /// + public event EventHandler? WorkflowTaskCompleted; + /// /// Gets the workflow definitions. /// @@ -140,5 +160,30 @@ public virtual object Clone() options.workflows = new List(Workflows); return options; } + + /// + /// Callback for task starting. + /// + /// Workflow instance. + internal void OnTaskStarting(WorkflowInstance instance) + { + if (WorkflowTaskStarting is { } handler) + { + handler(instance, new(instance)); + } + } + + /// + /// Callback for task completed. + /// + /// Workflow instance. + /// Task failure exception. + internal void OnTaskCompleted(WorkflowInstance instance, Exception? failureException) + { + if (WorkflowTaskCompleted is { } handler) + { + handler(instance, new(instance, failureException)); + } + } } } \ No newline at end of file diff --git a/src/Temporalio/Worker/WorkflowTaskCompletedEventArgs.cs b/src/Temporalio/Worker/WorkflowTaskCompletedEventArgs.cs new file mode 100644 index 00000000..5a2eb252 --- /dev/null +++ b/src/Temporalio/Worker/WorkflowTaskCompletedEventArgs.cs @@ -0,0 +1,33 @@ +using System; + +namespace Temporalio.Worker +{ + /// + /// Event arguments for workflow task completed events. + /// + /// + /// WARNING: This is experimental and there are many caveats about its use. It is important to + /// read the documentation on . + /// + public class WorkflowTaskCompletedEventArgs : WorkflowTaskEventArgs + { + /// + /// Initializes a new instance of the class. + /// + /// Workflow instance. + /// Task failure exception. + internal WorkflowTaskCompletedEventArgs( + WorkflowInstance workflowInstance, Exception? taskFailureException) + : base(workflowInstance) => TaskFailureException = taskFailureException; + + /// + /// Gets the task failure if any. + /// + /// + /// This is the task failure not the workflow failure. Task failures occur on all exceptions + /// except Temporal exceptions thrown from the workflow. These cause the workflow to + /// continually retry until code is fixed to solve the exception. + /// + public Exception? TaskFailureException { get; private init; } + } +} \ No newline at end of file diff --git a/src/Temporalio/Worker/WorkflowTaskEventArgs.cs b/src/Temporalio/Worker/WorkflowTaskEventArgs.cs new file mode 100644 index 00000000..ae963f10 --- /dev/null +++ b/src/Temporalio/Worker/WorkflowTaskEventArgs.cs @@ -0,0 +1,33 @@ +using System; + +namespace Temporalio.Worker +{ + /// + /// Base class for workflow task event arguments. + /// + /// + /// WARNING: This is experimental and there are many caveats about its use. It is important to + /// read the documentation on . + /// + public abstract class WorkflowTaskEventArgs : EventArgs + { + private readonly WorkflowInstance workflowInstance; + + /// + /// Initializes a new instance of the class. + /// + /// Workflow instance. + internal WorkflowTaskEventArgs(WorkflowInstance workflowInstance) => + this.workflowInstance = workflowInstance; + + /// + /// Gets the workflow information. + /// + public Workflows.WorkflowInfo WorkflowInfo => workflowInstance.Info; + + /// + /// Gets the workflow definition. + /// + public Workflows.WorkflowDefinition WorkflowDefinition => workflowInstance.Definition; + } +} \ No newline at end of file diff --git a/src/Temporalio/Worker/WorkflowTaskStartingEventArgs.cs b/src/Temporalio/Worker/WorkflowTaskStartingEventArgs.cs new file mode 100644 index 00000000..d7039522 --- /dev/null +++ b/src/Temporalio/Worker/WorkflowTaskStartingEventArgs.cs @@ -0,0 +1,21 @@ +namespace Temporalio.Worker +{ + /// + /// Event arguments for workflow task starting events. + /// + /// + /// WARNING: This is experimental and there are many caveats about its use. It is important to + /// read the documentation on . + /// + public class WorkflowTaskStartingEventArgs : WorkflowTaskEventArgs + { + /// + /// Initializes a new instance of the class. + /// + /// Workflow instance. + internal WorkflowTaskStartingEventArgs(WorkflowInstance workflowInstance) + : base(workflowInstance) + { + } + } +} \ No newline at end of file diff --git a/src/Temporalio/Worker/WorkflowWorker.cs b/src/Temporalio/Worker/WorkflowWorker.cs index 90f0cef2..a4df06dc 100644 --- a/src/Temporalio/Worker/WorkflowWorker.cs +++ b/src/Temporalio/Worker/WorkflowWorker.cs @@ -279,7 +279,9 @@ private IWorkflowInstance CreateInstance(WorkflowActivation act) FailureConverter: options.DataConverter.FailureConverter, LoggerFactory: options.LoggerFactory, DisableTracingEvents: options.DisableWorkflowTracingEventListener, - WorkflowStackTrace: options.WorkflowStackTrace)); + WorkflowStackTrace: options.WorkflowStackTrace, + OnTaskStarting: options.OnTaskStarting, + OnTaskCompleted: options.OnTaskCompleted)); } } } \ No newline at end of file diff --git a/src/Temporalio/Worker/WorkflowWorkerOptions.cs b/src/Temporalio/Worker/WorkflowWorkerOptions.cs index 390ec97d..05f4ab5c 100644 --- a/src/Temporalio/Worker/WorkflowWorkerOptions.cs +++ b/src/Temporalio/Worker/WorkflowWorkerOptions.cs @@ -16,5 +16,7 @@ internal record WorkflowWorkerOptions( Func WorkflowInstanceFactory, bool DebugMode, bool DisableWorkflowTracingEventListener, - WorkflowStackTrace WorkflowStackTrace); + WorkflowStackTrace WorkflowStackTrace, + Action OnTaskStarting, + Action OnTaskCompleted); } \ No newline at end of file diff --git a/src/Temporalio/Workflows/WorkflowInfo.cs b/src/Temporalio/Workflows/WorkflowInfo.cs index 76b5826c..10bb64b4 100644 --- a/src/Temporalio/Workflows/WorkflowInfo.cs +++ b/src/Temporalio/Workflows/WorkflowInfo.cs @@ -5,7 +5,7 @@ namespace Temporalio.Workflows { /// - /// Information about the running workflow. + /// Information about the running workflow. This is immutable for the life of the workflow run. /// /// Current workflow attempt. /// Run ID if this was continued. @@ -45,7 +45,7 @@ public record WorkflowInfo( { /// /// Gets the value that is set on - /// before this activity is + /// before this workflow is /// started. /// internal Dictionary LoggerScope { get; } = new() diff --git a/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs b/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs index 683a06a9..d1336a4c 100644 --- a/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs +++ b/tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs @@ -3,6 +3,7 @@ namespace Temporalio.Tests.Worker; +using System.Collections.Concurrent; using System.Threading.Tasks.Dataflow; using Microsoft.Extensions.Logging; using Temporalio.Activities; @@ -2944,6 +2945,73 @@ await ExecuteWorkerAsync( }); } + [Workflow] + public class TaskEventsWorkflow + { + [WorkflowRun] + public async Task RunAsync() + { + // Wait for cancel, then throw a task failure + try + { + await Workflow.DelayAsync(TimeSpan.FromDays(5)); + } + catch (TaskCanceledException) + { + throw new InvalidOperationException("Intentional task failure"); + } + } + } + + [Fact] + public async Task ExecuteWorkflowAsync_TaskEvents_AreRecordedProperly() + { + // Track events + var workerOptions = new TemporalWorkerOptions(); + var startingEvents = new ConcurrentQueue(); + var completedEvents = new ConcurrentQueue(); + EventHandler startingHandler = (_, e) => startingEvents.Enqueue(e); + EventHandler completedHandler = (_, e) => completedEvents.Enqueue(e); + workerOptions.WorkflowTaskStarting += startingHandler; + workerOptions.WorkflowTaskCompleted += completedHandler; + + // Run worker + await ExecuteWorkerAsync( + async worker => + { + // Remove the handlers to prove that altering event takes no effect after start + workerOptions.WorkflowTaskStarting -= startingHandler; + workerOptions.WorkflowTaskCompleted -= completedHandler; + + // Start + var handle = await Env.Client.StartWorkflowAsync( + (TaskEventsWorkflow wf) => wf.RunAsync(), + new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!)); + + // Wait for timer start to appear in history + await AssertHasEventEventuallyAsync(handle, e => e.TimerStartedEventAttributes != null); + + // Confirm events + Assert.Single(startingEvents); + Assert.Equal("TaskEventsWorkflow", startingEvents.Single().WorkflowDefinition.Name); + Assert.Equal(handle.Id, startingEvents.Single().WorkflowInfo.WorkflowId); + Assert.Equal(handle.ResultRunId, startingEvents.Single().WorkflowInfo.RunId); + Assert.Single(completedEvents); + Assert.Equal(handle.Id, completedEvents.Single().WorkflowInfo.WorkflowId); + Assert.Null(completedEvents.Single().TaskFailureException); + + // Now cancel, wait for task failure in history, and confirm task failure appears in + // events + await handle.CancelAsync(); + await AssertTaskFailureContainsEventuallyAsync(handle, "Intentional task failure"); + Assert.True(startingEvents.Count >= 2); + Assert.True(completedEvents.Count >= 2); + var exc = Assert.IsType(completedEvents.ElementAt(1).TaskFailureException); + Assert.Equal("Intentional task failure", exc.Message); + }, + workerOptions); + } + private async Task ExecuteWorkerAsync( Func action, TemporalWorkerOptions? options = null,