From 3c6f0d4d001c59a801ff5fa63d3b124d8b8f53c3 Mon Sep 17 00:00:00 2001 From: Chris Gillum Date: Tue, 16 Jun 2020 11:14:54 -0700 Subject: [PATCH 1/2] Add TaskEventId to function traces --- .../DurableTaskExtension.cs | 12 ++++++ .../EndToEndTraceHelper.cs | 37 +++++++++++++------ .../EtwEventSource.cs | 15 +++++--- .../Listener/TaskActivityShim.cs | 16 ++++++-- 4 files changed, 60 insertions(+), 20 deletions(-) diff --git a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs index a600bcde4..88f4ddea0 100644 --- a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs +++ b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs @@ -291,6 +291,7 @@ void IExtensionConfigProvider.Initialize(ExtensionConfigContext context) .BindToTrigger(new EntityTriggerAttributeBindingProvider(this, context, storageConnectionString, this.TraceHelper)); this.taskHubWorker = new TaskHubWorker(this.defaultDurabilityProvider, this, this); + this.taskHubWorker.AddActivityDispatcherMiddleware(this.ActivityMiddleware); this.taskHubWorker.AddOrchestrationDispatcherMiddleware(this.EntityMiddleware); this.taskHubWorker.AddOrchestrationDispatcherMiddleware(this.OrchestrationMiddleware); @@ -465,6 +466,17 @@ TaskActivity INameVersionObjectManager.GetObject(string name, stri return new TaskActivityShim(this, info.Executor, this.hostLifetimeService, name); } + private Task ActivityMiddleware(DispatchMiddlewareContext dispatchContext, Func next) + { + if (dispatchContext.GetProperty() is TaskActivityShim shim) + { + TaskScheduledEvent @event = dispatchContext.GetProperty(); + shim.SetTaskEventId(@event?.EventId ?? -1); + } + + return next(); + } + private async Task OrchestrationMiddleware(DispatchMiddlewareContext dispatchContext, Func next) { TaskOrchestrationShim shim = dispatchContext.GetProperty() as TaskOrchestrationShim; diff --git a/src/WebJobs.Extensions.DurableTask/EndToEndTraceHelper.cs b/src/WebJobs.Extensions.DurableTask/EndToEndTraceHelper.cs index 207e02de8..a1cb93b43 100644 --- a/src/WebJobs.Extensions.DurableTask/EndToEndTraceHelper.cs +++ b/src/WebJobs.Extensions.DurableTask/EndToEndTraceHelper.cs @@ -128,13 +128,15 @@ public void FunctionStarting( string instanceId, string input, FunctionType functionType, - bool isReplay) + bool isReplay, + int taskEventId = -1) { EtwEventSource.Instance.FunctionStarting( hubName, LocalAppName, LocalSlotName, functionName, + taskEventId, instanceId, input, functionType.ToString(), @@ -144,9 +146,9 @@ public void FunctionStarting( if (this.ShouldLogEvent(isReplay)) { this.logger.LogInformation( - "{instanceId}: Function '{functionName} ({functionType})' started. IsReplay: {isReplay}. Input: {input}. State: {state}. HubName: {hubName}. AppName: {appName}. SlotName: {slotName}. ExtensionVersion: {extensionVersion}. SequenceNumber: {sequenceNumber}.", + "{instanceId}: Function '{functionName} ({functionType})' started. IsReplay: {isReplay}. Input: {input}. State: {state}. HubName: {hubName}. AppName: {appName}. SlotName: {slotName}. ExtensionVersion: {extensionVersion}. SequenceNumber: {sequenceNumber}. TaskEventId: {taskEventId}", instanceId, functionName, functionType, isReplay, input, FunctionState.Started, hubName, - LocalAppName, LocalSlotName, ExtensionVersion, this.sequenceNumber++); + LocalAppName, LocalSlotName, ExtensionVersion, this.sequenceNumber++, taskEventId); } } @@ -212,13 +214,15 @@ public void FunctionCompleted( string output, bool continuedAsNew, FunctionType functionType, - bool isReplay) + bool isReplay, + int taskEventId = -1) { EtwEventSource.Instance.FunctionCompleted( hubName, LocalAppName, LocalSlotName, functionName, + taskEventId, instanceId, output, continuedAsNew, @@ -229,9 +233,9 @@ public void FunctionCompleted( if (this.ShouldLogEvent(isReplay)) { this.logger.LogInformation( - "{instanceId}: Function '{functionName} ({functionType})' completed. ContinuedAsNew: {continuedAsNew}. IsReplay: {isReplay}. Output: {output}. State: {state}. HubName: {hubName}. AppName: {appName}. SlotName: {slotName}. ExtensionVersion: {extensionVersion}. SequenceNumber: {sequenceNumber}.", + "{instanceId}: Function '{functionName} ({functionType})' completed. ContinuedAsNew: {continuedAsNew}. IsReplay: {isReplay}. Output: {output}. State: {state}. HubName: {hubName}. AppName: {appName}. SlotName: {slotName}. ExtensionVersion: {extensionVersion}. SequenceNumber: {sequenceNumber}. TaskEventId: {taskEventId}", instanceId, functionName, functionType, continuedAsNew, isReplay, output, FunctionState.Completed, - hubName, LocalAppName, LocalSlotName, ExtensionVersion, this.sequenceNumber++); + hubName, LocalAppName, LocalSlotName, ExtensionVersion, this.sequenceNumber++, taskEventId); } } @@ -297,16 +301,27 @@ public void FunctionFailed( string instanceId, string reason, FunctionType functionType, - bool isReplay) + bool isReplay, + int taskEventId = -1) { - EtwEventSource.Instance.FunctionFailed(hubName, LocalAppName, LocalSlotName, functionName, - instanceId, reason, functionType.ToString(), ExtensionVersion, isReplay); + EtwEventSource.Instance.FunctionFailed( + hubName, + LocalAppName, + LocalSlotName, + functionName, + taskEventId, + instanceId, + reason, + functionType.ToString(), + ExtensionVersion, + isReplay); + if (this.ShouldLogEvent(isReplay)) { this.logger.LogError( - "{instanceId}: Function '{functionName} ({functionType})' failed with an error. Reason: {reason}. IsReplay: {isReplay}. State: {state}. HubName: {hubName}. AppName: {appName}. SlotName: {slotName}. ExtensionVersion: {extensionVersion}. SequenceNumber: {sequenceNumber}.", + "{instanceId}: Function '{functionName} ({functionType})' failed with an error. Reason: {reason}. IsReplay: {isReplay}. State: {state}. HubName: {hubName}. AppName: {appName}. SlotName: {slotName}. ExtensionVersion: {extensionVersion}. SequenceNumber: {sequenceNumber}. TaskEventId: {taskEventId}", instanceId, functionName, functionType, reason, isReplay, FunctionState.Failed, hubName, - LocalAppName, LocalSlotName, ExtensionVersion, this.sequenceNumber++); + LocalAppName, LocalSlotName, ExtensionVersion, this.sequenceNumber++, taskEventId); } } diff --git a/src/WebJobs.Extensions.DurableTask/EtwEventSource.cs b/src/WebJobs.Extensions.DurableTask/EtwEventSource.cs index 40ab4a6f1..3aa28b0e0 100644 --- a/src/WebJobs.Extensions.DurableTask/EtwEventSource.cs +++ b/src/WebJobs.Extensions.DurableTask/EtwEventSource.cs @@ -34,19 +34,20 @@ public void FunctionScheduled( this.WriteEvent(201, TaskHub, AppName, SlotName, FunctionName, InstanceId, Reason, FunctionType, ExtensionVersion, IsReplay); } - [Event(202, Level = EventLevel.Informational, Version = 4)] + [Event(202, Level = EventLevel.Informational, Version = 5)] public void FunctionStarting( string TaskHub, string AppName, string SlotName, string FunctionName, + int TaskEventId, string InstanceId, string Input, string FunctionType, string ExtensionVersion, bool IsReplay) { - this.WriteEvent(202, TaskHub, AppName, SlotName, FunctionName, InstanceId, Input ?? "(null)", FunctionType, ExtensionVersion, IsReplay); + this.WriteEvent(202, TaskHub, AppName, SlotName, FunctionName, TaskEventId, InstanceId, Input ?? "(null)", FunctionType, ExtensionVersion, IsReplay); } [Event(203, Level = EventLevel.Informational, Version = 4)] @@ -94,12 +95,13 @@ public void ExternalEventRaised( this.WriteEvent(205, TaskHub, AppName, SlotName, FunctionName, InstanceId, EventName, Input ?? "(null)", FunctionType, ExtensionVersion, IsReplay); } - [Event(206, Level = EventLevel.Informational, Version = 4)] + [Event(206, Level = EventLevel.Informational, Version = 5)] public void FunctionCompleted( string TaskHub, string AppName, string SlotName, string FunctionName, + int TaskEventId, string InstanceId, string Output, bool ContinuedAsNew, @@ -107,7 +109,7 @@ public void FunctionCompleted( string ExtensionVersion, bool IsReplay) { - this.WriteEvent(206, TaskHub, AppName, SlotName, FunctionName, InstanceId, Output ?? "(null)", ContinuedAsNew, FunctionType, ExtensionVersion, IsReplay); + this.WriteEvent(206, TaskHub, AppName, SlotName, FunctionName, TaskEventId, InstanceId, Output ?? "(null)", ContinuedAsNew, FunctionType, ExtensionVersion, IsReplay); } [Event(207, Level = EventLevel.Warning, Version = 2)] @@ -125,19 +127,20 @@ public void FunctionTerminated( this.WriteEvent(207, TaskHub, AppName, SlotName, FunctionName, InstanceId, Reason, FunctionType, ExtensionVersion, IsReplay); } - [Event(208, Level = EventLevel.Error, Version = 3)] + [Event(208, Level = EventLevel.Error, Version = 4)] public void FunctionFailed( string TaskHub, string AppName, string SlotName, string FunctionName, + int TaskEventId, string InstanceId, string Reason, string FunctionType, string ExtensionVersion, bool IsReplay) { - this.WriteEvent(208, TaskHub, AppName, SlotName, FunctionName, InstanceId, Reason, FunctionType, ExtensionVersion, IsReplay); + this.WriteEvent(208, TaskHub, AppName, SlotName, FunctionName, TaskEventId, InstanceId, Reason, FunctionType, ExtensionVersion, IsReplay); } [Event(209, Level = EventLevel.Informational, Version = 2)] diff --git a/src/WebJobs.Extensions.DurableTask/Listener/TaskActivityShim.cs b/src/WebJobs.Extensions.DurableTask/Listener/TaskActivityShim.cs index 96db8dbb1..72921acae 100644 --- a/src/WebJobs.Extensions.DurableTask/Listener/TaskActivityShim.cs +++ b/src/WebJobs.Extensions.DurableTask/Listener/TaskActivityShim.cs @@ -23,6 +23,8 @@ internal class TaskActivityShim : TaskActivity private readonly IApplicationLifetimeWrapper hostServiceLifetime; private readonly string activityName; + private int taskEventId = -1; + public TaskActivityShim( DurableTaskExtension config, ITriggeredFunctionExecutor executor, @@ -56,7 +58,8 @@ public override async Task RunAsync(TaskContext context, string rawInput instanceId, this.config.GetIntputOutputTrace(rawInput), functionType: FunctionType.Activity, - isReplay: false); + isReplay: false, + taskEventId: this.taskEventId); WrappedFunctionResult result = await FunctionExecutionHelper.ExecuteActivityFunction( this.executor, @@ -74,7 +77,8 @@ public override async Task RunAsync(TaskContext context, string rawInput this.config.GetIntputOutputTrace(serializedOutput), continuedAsNew: false, functionType: FunctionType.Activity, - isReplay: false); + isReplay: false, + taskEventId: this.taskEventId); return serializedOutput; case WrappedFunctionResult.FunctionResultStatus.FunctionsRuntimeError: @@ -106,7 +110,8 @@ public override async Task RunAsync(TaskContext context, string rawInput instanceId, exceptionToReport?.ToString() ?? string.Empty, functionType: FunctionType.Activity, - isReplay: false); + isReplay: false, + taskEventId: this.taskEventId); throw new TaskFailureException( $"Activity function '{this.activityName}' failed: {exceptionToReport.Message}", @@ -122,6 +127,11 @@ public override string Run(TaskContext context, string input) throw new NotImplementedException(); } + internal void SetTaskEventId(int taskEventId) + { + this.taskEventId = taskEventId; + } + private static Exception StripFunctionInvocationException(Exception e) { var infrastructureException = e as FunctionInvocationException; From f3f6f117972823cba111ba9047a6965c9dbc70c3 Mon Sep 17 00:00:00 2001 From: Chris Gillum Date: Tue, 16 Jun 2020 15:40:55 -0700 Subject: [PATCH 2/2] PR feedback - comments --- .../DurableTaskExtension.cs | 22 +++++++++++++++++++ .../Listener/TaskActivityShim.cs | 6 ++++- 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs index 88f4ddea0..7e6b1a615 100644 --- a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs +++ b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs @@ -291,6 +291,9 @@ void IExtensionConfigProvider.Initialize(ExtensionConfigContext context) .BindToTrigger(new EntityTriggerAttributeBindingProvider(this, context, storageConnectionString, this.TraceHelper)); this.taskHubWorker = new TaskHubWorker(this.defaultDurabilityProvider, this, this); + + // Add middleware to the DTFx dispatcher so that we can inject our own logic + // into and customize the orchestration execution pipeline. this.taskHubWorker.AddActivityDispatcherMiddleware(this.ActivityMiddleware); this.taskHubWorker.AddOrchestrationDispatcherMiddleware(this.EntityMiddleware); this.taskHubWorker.AddOrchestrationDispatcherMiddleware(this.OrchestrationMiddleware); @@ -466,6 +469,12 @@ TaskActivity INameVersionObjectManager.GetObject(string name, stri return new TaskActivityShim(this, info.Executor, this.hostLifetimeService, name); } + /// + /// This DTFx activity middleware allows us to add context to the activity function shim + /// before it actually starts running. + /// + /// A property bag containing useful DTFx context. + /// The handler for running the next middleware in the pipeline. private Task ActivityMiddleware(DispatchMiddlewareContext dispatchContext, Func next) { if (dispatchContext.GetProperty() is TaskActivityShim shim) @@ -474,9 +483,16 @@ private Task ActivityMiddleware(DispatchMiddlewareContext dispatchContext, Func< shim.SetTaskEventId(@event?.EventId ?? -1); } + // Move to the next stage of the DTFx pipeline to trigger the activity shim. return next(); } + /// + /// This DTFx orchestration middleware allows us to initialize Durable Functions-specific context + /// and make the execution happen in a way that plays nice with the Azure Functions execution pipeline. + /// + /// A property bag containing useful DTFx context. + /// The handler for running the next middleware in the pipeline. private async Task OrchestrationMiddleware(DispatchMiddlewareContext dispatchContext, Func next) { TaskOrchestrationShim shim = dispatchContext.GetProperty() as TaskOrchestrationShim; @@ -578,6 +594,12 @@ private async Task OrchestrationMiddleware(DispatchMiddlewareContext dispatchCon await context.RunDeferredTasks(); } + /// + /// This DTFx orchestration middleware (for entities) allows us to add context and set state + /// to the entity shim orchestration before it starts executing the actual entity logic. + /// + /// A property bag containing useful DTFx context. + /// The handler for running the next middleware in the pipeline. private async Task EntityMiddleware(DispatchMiddlewareContext dispatchContext, Func next) { var entityShim = dispatchContext.GetProperty() as TaskEntityShim; diff --git a/src/WebJobs.Extensions.DurableTask/Listener/TaskActivityShim.cs b/src/WebJobs.Extensions.DurableTask/Listener/TaskActivityShim.cs index 72921acae..3f20fdeb5 100644 --- a/src/WebJobs.Extensions.DurableTask/Listener/TaskActivityShim.cs +++ b/src/WebJobs.Extensions.DurableTask/Listener/TaskActivityShim.cs @@ -2,7 +2,6 @@ // Licensed under the MIT License. See LICENSE in the project root for license information. using System; -using System.Threading; using System.Threading.Tasks; using DurableTask.Core; using DurableTask.Core.Common; @@ -23,6 +22,9 @@ internal class TaskActivityShim : TaskActivity private readonly IApplicationLifetimeWrapper hostServiceLifetime; private readonly string activityName; + /// + /// The DTFx-generated, auto-incrementing ID that uniquely identifies this activity function execution. + /// private int taskEventId = -1; public TaskActivityShim( @@ -129,6 +131,8 @@ public override string Run(TaskContext context, string input) internal void SetTaskEventId(int taskEventId) { + // We don't have the DTFx task event ID at TaskActivityShim-creation time + // so we have to set it here, before DTFx calls the RunAsync method. this.taskEventId = taskEventId; }