diff --git a/dotnet/samples/GettingStarted/FoundryAgents/FoundryAgents_Step01.1_Basics/Program.cs b/dotnet/samples/GettingStarted/FoundryAgents/FoundryAgents_Step01.1_Basics/Program.cs index 0a10930b75..4f370b410e 100644 --- a/dotnet/samples/GettingStarted/FoundryAgents/FoundryAgents_Step01.1_Basics/Program.cs +++ b/dotnet/samples/GettingStarted/FoundryAgents/FoundryAgents_Step01.1_Basics/Program.cs @@ -6,7 +6,6 @@ using Azure.AI.Projects.OpenAI; using Azure.Identity; using Microsoft.Agents.AI; -using Microsoft.Extensions.AI; string endpoint = Environment.GetEnvironmentVariable("AZURE_FOUNDRY_PROJECT_ENDPOINT") ?? throw new InvalidOperationException("AZURE_FOUNDRY_PROJECT_ENDPOINT is not set."); string deploymentName = Environment.GetEnvironmentVariable("AZURE_FOUNDRY_PROJECT_DEPLOYMENT_NAME") ?? "gpt-4o-mini"; diff --git a/dotnet/samples/GettingStarted/Workflows/Observability/ApplicationInsights/Program.cs b/dotnet/samples/GettingStarted/Workflows/Observability/ApplicationInsights/Program.cs index f7894f707a..a05a5cddf6 100644 --- a/dotnet/samples/GettingStarted/Workflows/Observability/ApplicationInsights/Program.cs +++ b/dotnet/samples/GettingStarted/Workflows/Observability/ApplicationInsights/Program.cs @@ -35,8 +35,10 @@ private static async Task Main() using var traceProvider = Sdk.CreateTracerProviderBuilder() .SetResourceBuilder(resourceBuilder) - .AddSource("Microsoft.Agents.AI.Workflows*") .AddSource(SourceName) + // The following source is only required if not specifying + // the `activitySource` in the WithOpenTelemetry call below + .AddSource("Microsoft.Agents.AI.Workflows*") .AddAzureMonitorTraceExporter(options => options.ConnectionString = applicationInsightsConnectionString) .Build(); @@ -51,6 +53,10 @@ private static async Task Main() // Build the workflow by connecting executors sequentially var workflow = new WorkflowBuilder(uppercase) .AddEdge(uppercase, reverse) + .WithOpenTelemetry( + // Set `EnableSensitiveData` to true to include message content in traces + configure: cfg => cfg.EnableSensitiveData = true, + activitySource: s_activitySource) .Build(); // Execute the workflow with input data diff --git a/dotnet/samples/GettingStarted/Workflows/Observability/AspireDashboard/Program.cs b/dotnet/samples/GettingStarted/Workflows/Observability/AspireDashboard/Program.cs index c04a397c55..06eebfba9d 100644 --- a/dotnet/samples/GettingStarted/Workflows/Observability/AspireDashboard/Program.cs +++ b/dotnet/samples/GettingStarted/Workflows/Observability/AspireDashboard/Program.cs @@ -37,8 +37,10 @@ private static async Task Main() using var traceProvider = Sdk.CreateTracerProviderBuilder() .SetResourceBuilder(resourceBuilder) - .AddSource("Microsoft.Agents.AI.Workflows*") .AddSource(SourceName) + // The following source is only required if not specifying + // the `activitySource` in the WithOpenTelemetry call below + .AddSource("Microsoft.Agents.AI.Workflows*") .AddOtlpExporter(options => options.Endpoint = new Uri(otlpEndpoint)) .Build(); @@ -53,6 +55,10 @@ private static async Task Main() // Build the workflow by connecting executors sequentially var workflow = new WorkflowBuilder(uppercase) .AddEdge(uppercase, reverse) + .WithOpenTelemetry( + // Set `EnableSensitiveData` to true to include message content in traces + configure: cfg => cfg.EnableSensitiveData = true, + activitySource: s_activitySource) .Build(); // Execute the workflow with input data diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/DeclarativeWorkflowOptions.cs b/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/DeclarativeWorkflowOptions.cs index 638bed1f90..ab7794f1b8 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/DeclarativeWorkflowOptions.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/DeclarativeWorkflowOptions.cs @@ -1,5 +1,8 @@ // Copyright (c) Microsoft. All rights reserved. +using System; +using System.Diagnostics; +using Microsoft.Agents.AI.Workflows.Observability; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; @@ -41,4 +44,23 @@ public sealed class DeclarativeWorkflowOptions(WorkflowAgentProvider agentProvid /// Gets the used to create loggers for workflow components. /// public ILoggerFactory LoggerFactory { get; init; } = NullLoggerFactory.Instance; + + /// + /// Gets the callback to configure telemetry options. + /// + public Action? ConfigureTelemetry { get; init; } + + /// + /// Gets an optional for telemetry. + /// If provided, the caller retains ownership and is responsible for disposal. + /// If but is set, a shared default + /// activity source named "Microsoft.Agents.AI.Workflows" will be used. + /// + public ActivitySource? TelemetryActivitySource { get; init; } + + /// + /// Gets a value indicating whether telemetry is enabled. + /// Telemetry is enabled when either or is set. + /// + internal bool IsTelemetryEnabled => this.ConfigureTelemetry is not null || this.TelemetryActivitySource is not null; } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/Interpreter/WorkflowActionVisitor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/Interpreter/WorkflowActionVisitor.cs index 1837142568..a90b1bd9c9 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/Interpreter/WorkflowActionVisitor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/Interpreter/WorkflowActionVisitor.cs @@ -51,6 +51,14 @@ public Workflow Complete() this._workflowModel.Build(builder); + // Apply telemetry if configured + if (this._workflowOptions.IsTelemetryEnabled) + { + builder.WorkflowBuilder.WithOpenTelemetry( + this._workflowOptions.ConfigureTelemetry, + this._workflowOptions.TelemetryActivitySource); + } + // Build final workflow return builder.WorkflowBuilder.Build(validateOrphans: false); } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/DirectEdgeRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/DirectEdgeRunner.cs index ee303c500b..db643ab441 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/DirectEdgeRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/DirectEdgeRunner.cs @@ -14,7 +14,7 @@ private async ValueTask FindRouterAsync(IStepTracer? tracer) => await protected internal override async ValueTask ChaseEdgeAsync(MessageEnvelope envelope, IStepTracer? stepTracer) { - using var activity = s_activitySource.StartActivity(ActivityNames.EdgeGroupProcess); + using var activity = this.StartActivity(); activity? .SetTag(Tags.EdgeGroupType, nameof(DirectEdgeRunner)) .SetTag(Tags.MessageSourceId, this.EdgeData.SourceId) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/EdgeRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/EdgeRunner.cs index d71fa539b3..309072c32f 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/EdgeRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/EdgeRunner.cs @@ -14,9 +14,6 @@ internal interface IStatefulEdgeRunner internal abstract class EdgeRunner { - protected static readonly string s_namespace = typeof(EdgeRunner).Namespace!; - protected static readonly ActivitySource s_activitySource = new(s_namespace); - // TODO: Can this be sync? protected internal abstract ValueTask ChaseEdgeAsync(MessageEnvelope envelope, IStepTracer? stepTracer); } @@ -26,4 +23,6 @@ internal abstract class EdgeRunner( { protected IRunnerContext RunContext { get; } = Throw.IfNull(runContext); protected TEdgeData EdgeData { get; } = Throw.IfNull(edgeData); + + protected Activity? StartActivity() => this.RunContext.TelemetryContext.StartEdgeGroupProcessActivity(); } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeRunner.cs index 02c0252af3..be80ef34de 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeRunner.cs @@ -19,7 +19,7 @@ internal sealed class FanInEdgeRunner(IRunnerContext runContext, FanInEdgeData e { Debug.Assert(!envelope.IsExternal, "FanIn edges should never be chased from external input"); - using var activity = s_activitySource.StartActivity(ActivityNames.EdgeGroupProcess); + using var activity = this.StartActivity(); activity? .SetTag(Tags.EdgeGroupType, nameof(FanInEdgeRunner)) .SetTag(Tags.MessageTargetId, this.EdgeData.SinkId); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanOutEdgeRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanOutEdgeRunner.cs index aa6133955d..d1102d6554 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanOutEdgeRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanOutEdgeRunner.cs @@ -13,7 +13,7 @@ internal sealed class FanOutEdgeRunner(IRunnerContext runContext, FanOutEdgeData { protected internal override async ValueTask ChaseEdgeAsync(MessageEnvelope envelope, IStepTracer? stepTracer) { - using var activity = s_activitySource.StartActivity(ActivityNames.EdgeGroupProcess); + using var activity = this.StartActivity(); activity? .SetTag(Tags.EdgeGroupType, nameof(FanOutEdgeRunner)) .SetTag(Tags.MessageSourceId, this.EdgeData.SourceId); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/IRunnerContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/IRunnerContext.cs index e84080c6a7..1c3d167b03 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/IRunnerContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/IRunnerContext.cs @@ -3,11 +3,14 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; +using Microsoft.Agents.AI.Workflows.Observability; namespace Microsoft.Agents.AI.Workflows.Execution; internal interface IRunnerContext : IExternalRequestSink, ISuperStepJoinContext { + WorkflowTelemetryContext TelemetryContext { get; } + ValueTask AddEventAsync(WorkflowEvent workflowEvent, CancellationToken cancellationToken = default); ValueTask SendMessageAsync(string sourceId, object message, string? targetId = null, CancellationToken cancellationToken = default); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ISuperStepRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ISuperStepRunner.cs index a7923a7d9b..fce4d9636a 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ISuperStepRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ISuperStepRunner.cs @@ -3,6 +3,7 @@ using System; using System.Threading; using System.Threading.Tasks; +using Microsoft.Agents.AI.Workflows.Observability; namespace Microsoft.Agents.AI.Workflows.Execution; @@ -12,6 +13,8 @@ internal interface ISuperStepRunner string StartExecutorId { get; } + WorkflowTelemetryContext TelemetryContext { get; } + bool HasUnservicedRequests { get; } bool HasUnprocessedMessages { get; } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs index b47a692113..250a9ee612 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs @@ -13,9 +13,6 @@ namespace Microsoft.Agents.AI.Workflows.Execution; internal sealed class LockstepRunEventStream : IRunEventStream { - private static readonly string s_namespace = typeof(LockstepRunEventStream).Namespace!; - private static readonly ActivitySource s_activitySource = new(s_namespace); - private readonly CancellationTokenSource _stopCancellation = new(); private readonly InputWaiter _inputWaiter = new(); private int _isDisposed; @@ -53,7 +50,7 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe this._stepRunner.OutgoingEvents.EventRaised += OnWorkflowEventAsync; - using Activity? activity = s_activitySource.StartActivity(ActivityNames.WorkflowRun); + using Activity? activity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity(); activity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId).SetTag(Tags.RunId, this._stepRunner.RunId); try diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ResponseEdgeRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ResponseEdgeRunner.cs index deab3bad52..969509e40b 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ResponseEdgeRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ResponseEdgeRunner.cs @@ -25,7 +25,7 @@ public static ResponseEdgeRunner ForPort(IRunnerContext runContext, string execu { Debug.Assert(envelope.IsExternal, "Input edges should only be chased from external input"); - using var activity = s_activitySource.StartActivity(ActivityNames.EdgeGroupProcess); + using var activity = this.StartActivity(); activity? .SetTag(Tags.EdgeGroupType, nameof(ResponseEdgeRunner)) .SetTag(Tags.MessageSourceId, envelope.SourceId) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs index ca0cc52641..4cce8df844 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs @@ -17,9 +17,6 @@ namespace Microsoft.Agents.AI.Workflows.Execution; /// internal sealed class StreamingRunEventStream : IRunEventStream { - private static readonly string s_namespace = typeof(StreamingRunEventStream).Namespace!; - private static readonly ActivitySource s_activitySource = new(s_namespace); - private readonly Channel _eventChannel; private readonly ISuperStepRunner _stepRunner; private readonly InputWaiter _inputWaiter; @@ -63,7 +60,7 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) // Subscribe to events - they will flow directly to the channel as they're raised this._stepRunner.OutgoingEvents.EventRaised += OnEventRaisedAsync; - using Activity? activity = s_activitySource.StartActivity(ActivityNames.WorkflowRun); + using Activity? activity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity(); activity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId).SetTag(Tags.RunId, this._stepRunner.RunId); try diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs index ba9cbac4e1..3dba017fa7 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs @@ -26,9 +26,6 @@ public abstract class Executor : IIdentified /// public string Id { get; } - private static readonly string s_namespace = typeof(Executor).Namespace!; - private static readonly ActivitySource s_activitySource = new(s_namespace); - // TODO: Add overloads for binding with a configuration/options object once the Configured hierarchy goes away. /// @@ -142,13 +139,13 @@ private set /// A ValueTask representing the asynchronous operation, wrapping the output from the executor. /// No handler found for the message type. /// An exception is generated while handling the message. - public async ValueTask ExecuteAsync(object message, TypeId messageType, IWorkflowContext context, CancellationToken cancellationToken = default) + public ValueTask ExecuteAsync(object message, TypeId messageType, IWorkflowContext context, CancellationToken cancellationToken = default) + => this.ExecuteAsync(message, messageType, context, WorkflowTelemetryContext.Disabled, cancellationToken); + + internal async ValueTask ExecuteAsync(object message, TypeId messageType, IWorkflowContext context, WorkflowTelemetryContext telemetryContext, CancellationToken cancellationToken = default) { - using var activity = s_activitySource.StartActivity(ActivityNames.ExecutorProcess, ActivityKind.Internal); - activity?.SetTag(Tags.ExecutorId, this.Id) - .SetTag(Tags.ExecutorType, this.GetType().FullName) - .SetTag(Tags.MessageType, messageType.TypeName) - .CreateSourceLinks(context.TraceContext); + using var activity = telemetryContext.StartExecutorProcessActivity(this.Id, this.GetType().FullName, messageType.TypeName, message); + activity?.CreateSourceLinks(context.TraceContext); await context.AddEventAsync(new ExecutorInvokedEvent(this.Id, message), cancellationToken).ConfigureAwait(false); @@ -183,6 +180,11 @@ private set return null; // Void result. } + // Output is not available if executor does not return anything, in which case + // messages sent in the handlers of this executor will be set in the message + // send activities. + telemetryContext.SetExecutorOutput(activity, result.Result); + // If we had a real return type, raise it as a SendMessage; TODO: Should we have a way to disable this behaviour? if (result.Result is not null && this.Options.AutoSendMessageHandlerResultObject) { diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs index 644ab3ec82..58e1890eed 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs @@ -9,6 +9,7 @@ using System.Threading.Tasks; using Microsoft.Agents.AI.Workflows.Checkpointing; using Microsoft.Agents.AI.Workflows.Execution; +using Microsoft.Agents.AI.Workflows.Observability; using Microsoft.Shared.Diagnostics; namespace Microsoft.Agents.AI.Workflows.InProc; @@ -70,6 +71,9 @@ private InProcessRunner(Workflow workflow, ICheckpointManager? checkpointManager /// public string StartExecutorId { get; } + /// + public WorkflowTelemetryContext TelemetryContext => this.Workflow.TelemetryContext; + private readonly HashSet _knownValidInputTypes; public async ValueTask IsValidInputTypeAsync(Type messageType, CancellationToken cancellationToken = default) { @@ -201,6 +205,7 @@ await executor.ExecuteAsync( envelope.Message, envelope.MessageType, this.RunContext.BindWorkflowContext(receiverId, envelope.TraceContext), + this.TelemetryContext, cancellationToken ).ConfigureAwait(false); } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs index ac0baf157f..419d46cd1b 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs @@ -70,6 +70,7 @@ public InProcessRunnerContext( this.ConcurrentRunsEnabled = enableConcurrentRuns; this.OutgoingEvents = outgoingEvents; } + public WorkflowTelemetryContext TelemetryContext => this._workflow.TelemetryContext; public IExternalRequestSink RegisterPort(string executorId, RequestPort port) { @@ -195,12 +196,10 @@ public ValueTask AddEventAsync(WorkflowEvent workflowEvent, CancellationToken ca return this.OutgoingEvents.EnqueueAsync(workflowEvent); } - private static readonly string s_namespace = typeof(IWorkflowContext).Namespace!; - private static readonly ActivitySource s_activitySource = new(s_namespace); - public async ValueTask SendMessageAsync(string sourceId, object message, string? targetId = null, CancellationToken cancellationToken = default) { - using Activity? activity = s_activitySource.StartActivity(ActivityNames.MessageSend, ActivityKind.Producer); + using Activity? activity = this._workflow.TelemetryContext.StartMessageSendActivity(sourceId, targetId, message); + // Create a carrier for trace context propagation var traceContext = activity is null ? null : new Dictionary(); if (traceContext is not null) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/ActivityNames.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/ActivityNames.cs index a845915a96..9d2912ecd3 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/ActivityNames.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/ActivityNames.cs @@ -5,7 +5,7 @@ namespace Microsoft.Agents.AI.Workflows.Observability; internal static class ActivityNames { public const string WorkflowBuild = "workflow.build"; - public const string WorkflowRun = "workflow.run"; + public const string WorkflowRun = "workflow_invoke"; public const string MessageSend = "message.send"; public const string ExecutorProcess = "executor.process"; public const string EdgeGroupProcess = "edge_group.process"; diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/Tags.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/Tags.cs index 9acba99eea..4b40e46f8c 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/Tags.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/Tags.cs @@ -14,7 +14,10 @@ internal static class Tags public const string RunId = "run.id"; public const string ExecutorId = "executor.id"; public const string ExecutorType = "executor.type"; + public const string ExecutorInput = "executor.input"; + public const string ExecutorOutput = "executor.output"; public const string MessageType = "message.type"; + public const string MessageContent = "message.content"; public const string EdgeGroupType = "edge_group.type"; public const string MessageSourceId = "message.source_id"; public const string MessageTargetId = "message.target_id"; diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs new file mode 100644 index 0000000000..e4b8d7a851 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs @@ -0,0 +1,216 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Text.Json; + +namespace Microsoft.Agents.AI.Workflows.Observability; + +/// +/// Internal context for workflow telemetry, holding the enabled state and configuration options. +/// +internal sealed class WorkflowTelemetryContext +{ + private const string DefaultSourceName = "Microsoft.Agents.AI.Workflows"; + private static readonly ActivitySource s_defaultActivitySource = new(DefaultSourceName); + + /// + /// Gets a shared instance representing disabled telemetry. + /// + public static WorkflowTelemetryContext Disabled { get; } = new(); + + /// + /// Gets a value indicating whether telemetry is enabled. + /// + public bool IsEnabled { get; } + + /// + /// Gets the telemetry options. + /// + public WorkflowTelemetryOptions Options { get; } + + /// + /// Gets the activity source used for creating telemetry spans. + /// + public ActivitySource ActivitySource { get; } + + private WorkflowTelemetryContext() + { + this.IsEnabled = false; + this.Options = new WorkflowTelemetryOptions(); + this.ActivitySource = s_defaultActivitySource; + } + + /// + /// Initializes a new instance of the class with telemetry enabled. + /// + /// The telemetry options. + /// + /// An optional activity source to use. If provided, this activity source will be used directly + /// and the caller retains ownership (responsible for disposal). If , the + /// shared default activity source will be used. + /// + public WorkflowTelemetryContext(WorkflowTelemetryOptions options, ActivitySource? activitySource = null) + { + this.IsEnabled = true; + this.Options = options; + this.ActivitySource = activitySource ?? s_defaultActivitySource; + } + + /// + /// Starts an activity if telemetry is enabled, otherwise returns null. + /// + /// The activity name. + /// The activity kind. + /// An activity if telemetry is enabled and the activity is sampled, otherwise null. + public Activity? StartActivity(string name, ActivityKind kind = ActivityKind.Internal) + { + if (!this.IsEnabled) + { + return null; + } + + return this.ActivitySource.StartActivity(name, kind); + } + + /// + /// Starts a workflow build activity if enabled. + /// + /// An activity if workflow build telemetry is enabled, otherwise null. + public Activity? StartWorkflowBuildActivity() + { + if (!this.IsEnabled || this.Options.DisableWorkflowBuild) + { + return null; + } + + return this.ActivitySource.StartActivity(ActivityNames.WorkflowBuild); + } + + /// + /// Starts a workflow run activity if enabled. + /// + /// An activity if workflow run telemetry is enabled, otherwise null. + public Activity? StartWorkflowRunActivity() + { + if (!this.IsEnabled || this.Options.DisableWorkflowRun) + { + return null; + } + + return this.ActivitySource.StartActivity(ActivityNames.WorkflowRun); + } + + /// + /// Starts an executor process activity if enabled, with all standard tags set. + /// + /// The executor identifier. + /// The executor type name. + /// The message type name. + /// The input message. Logged only when is true. + /// An activity if executor process telemetry is enabled, otherwise null. + public Activity? StartExecutorProcessActivity(string executorId, string? executorType, string messageType, object? message) + { + if (!this.IsEnabled || this.Options.DisableExecutorProcess) + { + return null; + } + + Activity? activity = this.ActivitySource.StartActivity(ActivityNames.ExecutorProcess + " " + executorId); + if (activity is null) + { + return null; + } + + activity.SetTag(Tags.ExecutorId, executorId) + .SetTag(Tags.ExecutorType, executorType) + .SetTag(Tags.MessageType, messageType); + + if (this.Options.EnableSensitiveData) + { + activity.SetTag(Tags.ExecutorInput, SerializeForTelemetry(message)); + } + + return activity; + } + + /// + /// Sets the executor output tag on an activity when sensitive data logging is enabled. + /// + /// The activity to set the output on. + /// The output value to log. + public void SetExecutorOutput(Activity? activity, object? output) + { + if (activity is not null && this.Options.EnableSensitiveData) + { + activity.SetTag(Tags.ExecutorOutput, SerializeForTelemetry(output)); + } + } + + /// + /// Starts an edge group process activity if enabled. + /// + /// An activity if edge group process telemetry is enabled, otherwise null. + public Activity? StartEdgeGroupProcessActivity() + { + if (!this.IsEnabled || this.Options.DisableEdgeGroupProcess) + { + return null; + } + + return this.ActivitySource.StartActivity(ActivityNames.EdgeGroupProcess); + } + + /// + /// Starts a message send activity if enabled, with all standard tags set. + /// + /// The source executor identifier. + /// The target executor identifier, if any. + /// The message being sent. Logged only when is true. + /// An activity if message send telemetry is enabled, otherwise null. + public Activity? StartMessageSendActivity(string sourceId, string? targetId, object? message) + { + if (!this.IsEnabled || this.Options.DisableMessageSend) + { + return null; + } + + Activity? activity = this.ActivitySource.StartActivity(ActivityNames.MessageSend, ActivityKind.Producer); + if (activity is null) + { + return null; + } + + activity.SetTag(Tags.MessageSourceId, sourceId); + if (targetId is not null) + { + activity.SetTag(Tags.MessageTargetId, targetId); + } + + if (this.Options.EnableSensitiveData) + { + activity.SetTag(Tags.MessageContent, SerializeForTelemetry(message)); + } + + return activity; + } + + [UnconditionalSuppressMessage("ReflectionAnalysis", "IL3050:RequiresDynamicCode", Justification = "Telemetry serialization is optional and only used when explicitly enabled.")] + [UnconditionalSuppressMessage("Trimming", "IL2026:Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access", Justification = "Telemetry serialization is optional and only used when explicitly enabled.")] + private static string? SerializeForTelemetry(object? value) + { + if (value is null) + { + return null; + } + + try + { + return JsonSerializer.Serialize(value, value.GetType()); + } + catch (JsonException) + { + return $"[Unserializable: {value.GetType().FullName}]"; + } + } +} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryOptions.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryOptions.cs new file mode 100644 index 0000000000..b32f0c0f66 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryOptions.cs @@ -0,0 +1,68 @@ +// Copyright (c) Microsoft. All rights reserved. + +namespace Microsoft.Agents.AI.Workflows.Observability; + +/// +/// Configuration options for workflow telemetry. +/// +public sealed class WorkflowTelemetryOptions +{ + /// + /// Gets or sets a value indicating whether potentially sensitive information should be included in telemetry. + /// + /// + /// if potentially sensitive information should be included in telemetry; + /// if telemetry shouldn't include raw inputs and outputs. + /// The default value is . + /// + /// + /// By default, telemetry includes metadata but not raw inputs and outputs, + /// such as message content and executor data. + /// + public bool EnableSensitiveData { get; set; } + + /// + /// Gets or sets a value indicating whether workflow build activities should be disabled. + /// + /// + /// to disable workflow.build activities; + /// to enable them. The default value is . + /// + public bool DisableWorkflowBuild { get; set; } + + /// + /// Gets or sets a value indicating whether workflow run activities should be disabled. + /// + /// + /// to disable workflow_invoke activities; + /// to enable them. The default value is . + /// + public bool DisableWorkflowRun { get; set; } + + /// + /// Gets or sets a value indicating whether executor process activities should be disabled. + /// + /// + /// to disable executor.process activities; + /// to enable them. The default value is . + /// + public bool DisableExecutorProcess { get; set; } + + /// + /// Gets or sets a value indicating whether edge group process activities should be disabled. + /// + /// + /// to disable edge_group.process activities; + /// to enable them. The default value is . + /// + public bool DisableEdgeGroupProcess { get; set; } + + /// + /// Gets or sets a value indicating whether message send activities should be disabled. + /// + /// + /// to disable message.send activities; + /// to enable them. The default value is . + /// + public bool DisableMessageSend { get; set; } +} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/OpenTelemetryWorkflowBuilderExtensions.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/OpenTelemetryWorkflowBuilderExtensions.cs new file mode 100644 index 0000000000..ffa0f0362d --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/OpenTelemetryWorkflowBuilderExtensions.cs @@ -0,0 +1,69 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Diagnostics; +using Microsoft.Agents.AI.Workflows.Observability; +using Microsoft.Shared.Diagnostics; + +namespace Microsoft.Agents.AI.Workflows; + +/// +/// Provides extension methods for adding OpenTelemetry instrumentation to instances. +/// +public static class OpenTelemetryWorkflowBuilderExtensions +{ + /// + /// Enables OpenTelemetry instrumentation for the workflow, providing comprehensive observability for workflow operations. + /// + /// The to which OpenTelemetry support will be added. + /// + /// An optional callback that provides additional configuration of the instance. + /// This allows for fine-tuning telemetry behavior such as enabling sensitive data collection. + /// + /// + /// An optional to use for telemetry. If provided, this activity source will be used + /// directly and the caller retains ownership (responsible for disposal). If , a shared + /// default activity source named "Microsoft.Agents.AI.Workflows" will be used. + /// + /// The with OpenTelemetry instrumentation enabled, enabling method chaining. + /// is . + /// + /// + /// This extension adds comprehensive telemetry capabilities to workflows, including: + /// + /// Distributed tracing of workflow execution + /// Executor invocation and processing spans + /// Edge routing and message delivery spans + /// Workflow build and validation spans + /// Error tracking and exception details + /// + /// + /// + /// By default, workflow telemetry is disabled. Call this method to enable telemetry collection. + /// + /// + /// + /// + /// var workflow = new WorkflowBuilder(startExecutor) + /// .AddEdge(executor1, executor2) + /// .WithOpenTelemetry(cfg => cfg.EnableSensitiveData = true) + /// .Build(); + /// + /// + public static WorkflowBuilder WithOpenTelemetry( + this WorkflowBuilder builder, + Action? configure = null, + ActivitySource? activitySource = null) + { + Throw.IfNull(builder); + + WorkflowTelemetryOptions options = new(); + configure?.Invoke(options); + + WorkflowTelemetryContext context = new(options, activitySource); + + builder.SetTelemetryContext(context); + + return builder; + } +} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Workflow.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Workflow.cs index 8b77da7fc2..6b26a403cf 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Workflow.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Workflow.cs @@ -8,6 +8,7 @@ using System.Threading.Tasks; using Microsoft.Agents.AI.Workflows.Checkpointing; using Microsoft.Agents.AI.Workflows.Execution; +using Microsoft.Agents.AI.Workflows.Observability; using Microsoft.Shared.Diagnostics; namespace Microsoft.Agents.AI.Workflows; @@ -76,6 +77,11 @@ public Dictionary ReflectExecutors() /// public string? Description { get; internal init; } + /// + /// Gets the telemetry context for the workflow. + /// + internal WorkflowTelemetryContext TelemetryContext { get; } + internal bool AllowConcurrent => this.ExecutorBindings.Values.All(registration => registration.SupportsConcurrentSharedExecution); internal IEnumerable NonConcurrentExecutorIds => @@ -88,11 +94,13 @@ public Dictionary ReflectExecutors() /// The unique identifier of the starting executor for the workflow. Cannot be null. /// Optional human-readable name for the workflow. /// Optional description of what the workflow does. - internal Workflow(string startExecutorId, string? name = null, string? description = null) + /// Optional telemetry context for the workflow. + internal Workflow(string startExecutorId, string? name = null, string? description = null, WorkflowTelemetryContext? telemetryContext = null) { this.StartExecutorId = Throw.IfNull(startExecutorId); this.Name = name; this.Description = description; + this.TelemetryContext = telemetryContext ?? WorkflowTelemetryContext.Disabled; } private bool _needsReset; diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilder.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilder.cs index 4b6980d433..36a3468e2d 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilder.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilder.cs @@ -38,9 +38,7 @@ private readonly record struct EdgeConnection(string SourceId, string TargetId) private readonly string _startExecutorId; private string? _name; private string? _description; - - private static readonly string s_namespace = typeof(WorkflowBuilder).Namespace!; - private static readonly ActivitySource s_activitySource = new(s_namespace); + private WorkflowTelemetryContext _telemetryContext = WorkflowTelemetryContext.Disabled; /// /// Initializes a new instance of the WorkflowBuilder class with the specified starting executor. @@ -137,6 +135,15 @@ public WorkflowBuilder WithDescription(string description) return this; } + /// + /// Sets the telemetry context for the workflow. + /// + /// The telemetry context to use. + internal void SetTelemetryContext(WorkflowTelemetryContext context) + { + this._telemetryContext = Throw.IfNull(context); + } + /// /// Binds the specified executor (via registration) to the workflow, allowing it to participate in workflow execution. /// @@ -563,7 +570,7 @@ private Workflow BuildInternal(bool validateOrphans, Activity? activity = null) activity?.AddEvent(new ActivityEvent(EventNames.BuildValidationCompleted)); - var workflow = new Workflow(this._startExecutorId, this._name, this._description) + var workflow = new Workflow(this._startExecutorId, this._name, this._description, this._telemetryContext) { ExecutorBindings = this._executorBindings, Edges = this._edges, @@ -601,7 +608,7 @@ private Workflow BuildInternal(bool validateOrphans, Activity? activity = null) /// or if the start executor is not bound. public Workflow Build(bool validateOrphans = true) { - using Activity? activity = s_activitySource.StartActivity(ActivityNames.WorkflowBuild); + using Activity? activity = this._telemetryContext.StartWorkflowBuildActivity(); var workflow = this.BuildInternal(validateOrphans, activity); diff --git a/dotnet/tests/Microsoft.Agents.AI.CosmosNoSql.UnitTests/CosmosDBCollectionFixture.cs b/dotnet/tests/Microsoft.Agents.AI.CosmosNoSql.UnitTests/CosmosDBCollectionFixture.cs index 195c433de5..d6825ad30d 100644 --- a/dotnet/tests/Microsoft.Agents.AI.CosmosNoSql.UnitTests/CosmosDBCollectionFixture.cs +++ b/dotnet/tests/Microsoft.Agents.AI.CosmosNoSql.UnitTests/CosmosDBCollectionFixture.cs @@ -1,7 +1,5 @@ // Copyright (c) Microsoft. All rights reserved. -using Xunit; - namespace Microsoft.Agents.AI.CosmosNoSql.UnitTests; /// diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.Declarative.UnitTests/DeclarativeWorkflowOptionsTest.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.Declarative.UnitTests/DeclarativeWorkflowOptionsTest.cs new file mode 100644 index 0000000000..73c4792e32 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.Declarative.UnitTests/DeclarativeWorkflowOptionsTest.cs @@ -0,0 +1,259 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Concurrent; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Agents.AI.Workflows.Observability; +using Microsoft.Extensions.AI; +using Microsoft.Extensions.Logging.Abstractions; +using Moq; + +namespace Microsoft.Agents.AI.Workflows.Declarative.UnitTests; + +/// +/// Tests for telemetry configuration. +/// +[Collection("DeclarativeWorkflowOptionsTest")] +public sealed class DeclarativeWorkflowOptionsTest : IDisposable +{ + // These constants mirror Microsoft.Agents.AI.Workflows.Observability.ActivityNames + // which is internal and not accessible from this test project. + private const string WorkflowBuildActivityName = "workflow.build"; + private const string WorkflowRunActivityName = "workflow_invoke"; + + // The default activity source name used by the workflow telemetry context. + private const string DefaultTelemetrySourceName = "Microsoft.Agents.AI.Workflows"; + + private const string SimpleWorkflowYaml = """ + kind: Workflow + trigger: + kind: OnConversationStart + id: test_workflow + actions: + - kind: EndConversation + id: end_all + """; + + private readonly ActivitySource _activitySource = new("TestSource"); + private readonly ActivityListener _activityListener; + private readonly ConcurrentBag _capturedActivities = []; + + public DeclarativeWorkflowOptionsTest() + { + this._activityListener = new ActivityListener + { + ShouldListenTo = source => + source.Name == DefaultTelemetrySourceName || + source.Name == "TestSource", + Sample = (ref ActivityCreationOptions options) => ActivitySamplingResult.AllData, + ActivityStarted = activity => this._capturedActivities.Add(activity), + }; + ActivitySource.AddActivityListener(this._activityListener); + } + + public void Dispose() + { + this._activityListener.Dispose(); + this._activitySource.Dispose(); + } + + [Fact] + public void ConfigureTelemetry_DefaultIsNull() + { + // Arrange + Mock mockProvider = CreateMockProvider(); + + // Act + DeclarativeWorkflowOptions options = new(mockProvider.Object); + + // Assert + Assert.Null(options.ConfigureTelemetry); + } + + [Fact] + public void ConfigureTelemetry_CanBeSet() + { + // Arrange + Mock mockProvider = CreateMockProvider(); + bool callbackInvoked = false; + + // Act + DeclarativeWorkflowOptions options = new(mockProvider.Object) + { + ConfigureTelemetry = opt => + { + callbackInvoked = true; + opt.EnableSensitiveData = true; + } + }; + + // Assert + Assert.NotNull(options.ConfigureTelemetry); + WorkflowTelemetryOptions telemetryOptions = new(); + options.ConfigureTelemetry(telemetryOptions); + Assert.True(callbackInvoked); + Assert.True(telemetryOptions.EnableSensitiveData); + } + + [Fact] + public void TelemetryActivitySource_DefaultIsNull() + { + // Arrange + Mock mockProvider = CreateMockProvider(); + + // Act + DeclarativeWorkflowOptions options = new(mockProvider.Object); + + // Assert + Assert.Null(options.TelemetryActivitySource); + } + + [Fact] + public void TelemetryActivitySource_CanBeSet() + { + // Arrange + Mock mockProvider = CreateMockProvider(); + + // Act + DeclarativeWorkflowOptions options = new(mockProvider.Object) + { + TelemetryActivitySource = this._activitySource + }; + + // Assert + Assert.Same(this._activitySource, options.TelemetryActivitySource); + } + + [Fact] + public async Task BuildWorkflow_WithDefaultTelemetry_AppliesTelemetryAsync() + { + // Arrange + using Activity testActivity = new Activity("DefaultTelemetryTest").Start()!; + Mock mockProvider = CreateMockProvider(); + DeclarativeWorkflowOptions options = new(mockProvider.Object) + { + ConfigureTelemetry = _ => { }, + LoggerFactory = NullLoggerFactory.Instance + }; + + // Act + using StringReader reader = new(SimpleWorkflowYaml); + Workflow workflow = DeclarativeWorkflowBuilder.Build(reader, options); + + await using Run run = await InProcessExecution.RunAsync(workflow, "test input"); + + // Assert + Activity[] capturedActivities = this._capturedActivities + .Where(a => a.RootId == testActivity.RootId && a.Source.Name == DefaultTelemetrySourceName) + .ToArray(); + + Assert.NotEmpty(capturedActivities); + Assert.Contains(capturedActivities, a => a.OperationName.StartsWith(WorkflowBuildActivityName, StringComparison.Ordinal)); + Assert.Contains(capturedActivities, a => a.OperationName.StartsWith(WorkflowRunActivityName, StringComparison.Ordinal)); + } + + [Fact] + public async Task BuildWorkflow_WithTelemetryActivitySource_AppliesTelemetryAsync() + { + // Arrange + using Activity testActivity = new Activity("TelemetryActivitySourceTest").Start()!; + Mock mockProvider = CreateMockProvider(); + DeclarativeWorkflowOptions options = new(mockProvider.Object) + { + TelemetryActivitySource = this._activitySource, + LoggerFactory = NullLoggerFactory.Instance + }; + + // Act + using StringReader reader = new(SimpleWorkflowYaml); + Workflow workflow = DeclarativeWorkflowBuilder.Build(reader, options); + + await using Run run = await InProcessExecution.RunAsync(workflow, "test input"); + + // Assert + Activity[] capturedActivities = this._capturedActivities + .Where(a => a.RootId == testActivity.RootId && a.Source.Name == "TestSource") + .ToArray(); + + Assert.NotEmpty(capturedActivities); + Assert.All(capturedActivities, a => Assert.Equal("TestSource", a.Source.Name)); + } + + [Fact] + public async Task BuildWorkflow_WithConfigureTelemetry_AppliesConfigurationAsync() + { + // Arrange + using Activity testActivity = new Activity("ConfigureTelemetryTest").Start()!; + Mock mockProvider = CreateMockProvider(); + bool configureInvoked = false; + DeclarativeWorkflowOptions options = new(mockProvider.Object) + { + ConfigureTelemetry = opt => + { + configureInvoked = true; + opt.EnableSensitiveData = true; + }, + LoggerFactory = NullLoggerFactory.Instance + }; + + // Act + using StringReader reader = new(SimpleWorkflowYaml); + Workflow workflow = DeclarativeWorkflowBuilder.Build(reader, options); + + await using Run run = await InProcessExecution.RunAsync(workflow, "test input"); + + // Assert + Assert.True(configureInvoked); + + Activity[] capturedActivities = this._capturedActivities + .Where(a => a.RootId == testActivity.RootId && a.Source.Name == DefaultTelemetrySourceName) + .ToArray(); + + Assert.NotEmpty(capturedActivities); + Assert.Contains(capturedActivities, a => a.OperationName.StartsWith(WorkflowBuildActivityName, StringComparison.Ordinal)); + Assert.Contains(capturedActivities, a => a.OperationName.StartsWith(WorkflowRunActivityName, StringComparison.Ordinal)); + } + + [Fact] + public async Task BuildWorkflow_WithoutTelemetry_DoesNotCreateActivitiesAsync() + { + // Arrange + using Activity testActivity = new Activity("NoTelemetryTest").Start()!; + Mock mockProvider = CreateMockProvider(); + DeclarativeWorkflowOptions options = new(mockProvider.Object) + { + LoggerFactory = NullLoggerFactory.Instance + }; + + // Act + using StringReader reader = new(SimpleWorkflowYaml); + Workflow workflow = DeclarativeWorkflowBuilder.Build(reader, options); + + await using Run run = await InProcessExecution.RunAsync(workflow, "test input"); + + // Assert - No workflow activities should be created when telemetry is disabled + Activity[] capturedActivities = this._capturedActivities + .Where(a => a.RootId == testActivity.RootId && + (a.OperationName.StartsWith(WorkflowBuildActivityName, StringComparison.Ordinal) || + a.OperationName.StartsWith(WorkflowRunActivityName, StringComparison.Ordinal))) + .ToArray(); + + Assert.Empty(capturedActivities); + } + + private static Mock CreateMockProvider() + { + Mock mockAgentProvider = new(MockBehavior.Strict); + mockAgentProvider + .Setup(provider => provider.CreateConversationAsync(It.IsAny())) + .Returns(() => Task.FromResult(Guid.NewGuid().ToString("N"))); + mockAgentProvider + .Setup(provider => provider.CreateMessageAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new ChatMessage(ChatRole.Assistant, "Test response"))); + return mockAgentProvider; + } +} diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs index 8ab6280b46..e7a99d5ca2 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs @@ -67,7 +67,7 @@ private static Workflow CreateWorkflow() WorkflowBuilder builder = new(uppercase); builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse); - return builder.Build(); + return builder.WithOpenTelemetry().Build(); } private static Dictionary GetExpectedActivityNameCounts() => @@ -111,8 +111,6 @@ private async Task TestWorkflowEndToEndActivitiesAsync(string executionEnvironme Run run = await executionEnvironment.RunAsync(workflow, "Hello, World!"); await run.DisposeAsync(); - await Task.Delay(100); // Allow time for activities to be captured - // Assert var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); capturedActivities.Should().HaveCount(8, "Exactly 8 activities should be created."); @@ -122,12 +120,12 @@ private async Task TestWorkflowEndToEndActivitiesAsync(string executionEnvironme { var activityName = kvp.Key; var expectedCount = kvp.Value; - var actualCount = capturedActivities.Count(a => a.OperationName == activityName); + var actualCount = capturedActivities.Count(a => a.OperationName.StartsWith(activityName, StringComparison.Ordinal)); actualCount.Should().Be(expectedCount, $"Activity '{activityName}' should occur {expectedCount} times."); } // Verify WorkflowRun activity events include workflow lifecycle events - var workflowRunActivity = capturedActivities.First(a => a.OperationName == ActivityNames.WorkflowRun); + var workflowRunActivity = capturedActivities.First(a => a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)); var activityEvents = workflowRunActivity.Events.ToList(); activityEvents.Should().Contain(e => e.Name == EventNames.WorkflowStarted, "activity should have workflow started event"); activityEvents.Should().Contain(e => e.Name == EventNames.WorkflowCompleted, "activity should have workflow completed event"); @@ -166,8 +164,6 @@ public async Task CreatesWorkflowActivities_WithCorrectNameAsync() // Act CreateWorkflow(); - await Task.Delay(100); // Allow time for activities to be captured - // Assert var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); capturedActivities.Should().HaveCount(1, "Exactly 1 activity should be created."); @@ -183,4 +179,325 @@ public async Task CreatesWorkflowActivities_WithCorrectNameAsync() tags.Should().ContainKey(Tags.WorkflowId); tags.Should().ContainKey(Tags.WorkflowDefinition); } + + [Fact] + public async Task TelemetryDisabledByDefault_CreatesNoActivitiesAsync() + { + // Arrange + // Create a test activity to correlate captured activities + using var testActivity = new Activity("ObservabilityTest").Start(); + + // Act - Build workflow WITHOUT calling WithOpenTelemetry() + Func uppercaseFunc = s => s.ToUpperInvariant(); + var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor"); + + WorkflowBuilder builder = new(uppercase); + builder.Build(); // No WithOpenTelemetry() call + // Assert - No activities should be created + var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); + capturedActivities.Should().BeEmpty("No activities should be created when telemetry is disabled (default)."); + } + + [Fact] + public async Task WithOpenTelemetry_UsesProvidedActivitySourceAsync() + { + // Arrange + using var testActivity = new Activity("ObservabilityTest").Start(); + using var userActivitySource = new ActivitySource("UserProvidedSource"); + + // Set up a separate listener for the user-provided source + ConcurrentBag userActivities = []; + using var userListener = new ActivityListener + { + ShouldListenTo = source => source.Name == "UserProvidedSource", + Sample = (ref ActivityCreationOptions options) => ActivitySamplingResult.AllData, + ActivityStarted = activity => userActivities.Add(activity), + }; + ActivitySource.AddActivityListener(userListener); + + Func uppercaseFunc = s => s.ToUpperInvariant(); + var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor"); + + // Act + WorkflowBuilder builder = new(uppercase); + var workflow = builder.WithOpenTelemetry(activitySource: userActivitySource).Build(); + + Run run = await InProcessExecution.Default.RunAsync(workflow, "Hello"); + await run.DisposeAsync(); + + // Assert + var capturedActivities = userActivities.Where(a => a.RootId == testActivity.RootId).ToList(); + capturedActivities.Should().NotBeEmpty("Activities should be created with user-provided ActivitySource."); + capturedActivities.Should().OnlyContain( + a => a.Source.Name == "UserProvidedSource", + "All activities should come from the user-provided ActivitySource."); + } + + [Fact] + public async Task DisableWorkflowBuild_PreventsWorkflowBuildActivityAsync() + { + // Arrange + using var testActivity = new Activity("ObservabilityTest").Start(); + + Func uppercaseFunc = s => s.ToUpperInvariant(); + var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor"); + + // Act + WorkflowBuilder builder = new(uppercase); + builder.WithOpenTelemetry(configure: opts => opts.DisableWorkflowBuild = true).Build(); + + // Assert + var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); + capturedActivities.Should().NotContain( + a => a.OperationName.StartsWith(ActivityNames.WorkflowBuild, StringComparison.Ordinal), + "WorkflowBuild activity should be disabled."); + } + + [Fact] + public async Task DisableWorkflowRun_PreventsWorkflowRunActivityAsync() + { + // Arrange + using var testActivity = new Activity("ObservabilityTest").Start(); + + Func uppercaseFunc = s => s.ToUpperInvariant(); + var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor"); + + // Act + WorkflowBuilder builder = new(uppercase); + builder.WithOutputFrom(uppercase); + var workflow = builder.WithOpenTelemetry(configure: opts => opts.DisableWorkflowRun = true).Build(); + + Run run = await InProcessExecution.Default.RunAsync(workflow, "Hello"); + await run.DisposeAsync(); + + // Assert + var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); + capturedActivities.Should().NotContain( + a => a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal), + "WorkflowRun activity should be disabled."); + capturedActivities.Should().Contain( + a => a.OperationName.StartsWith(ActivityNames.WorkflowBuild, StringComparison.Ordinal), + "Other activities should still be created."); + } + + [Fact] + public async Task DisableExecutorProcess_PreventsExecutorProcessActivityAsync() + { + // Arrange + using var testActivity = new Activity("ObservabilityTest").Start(); + + Func uppercaseFunc = s => s.ToUpperInvariant(); + var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor"); + + // Act + WorkflowBuilder builder = new(uppercase); + builder.WithOutputFrom(uppercase); + var workflow = builder.WithOpenTelemetry(configure: opts => opts.DisableExecutorProcess = true).Build(); + + Run run = await InProcessExecution.Default.RunAsync(workflow, "Hello"); + await run.DisposeAsync(); + + // Assert + var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); + capturedActivities.Should().NotContain( + a => a.OperationName.StartsWith(ActivityNames.ExecutorProcess, StringComparison.Ordinal), + "ExecutorProcess activity should be disabled."); + capturedActivities.Should().Contain( + a => a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal), + "Other activities should still be created."); + } + + [Fact] + public async Task DisableEdgeGroupProcess_PreventsEdgeGroupProcessActivityAsync() + { + // Arrange + using var testActivity = new Activity("ObservabilityTest").Start(); + var workflow = CreateWorkflowWithDisabledEdges(); + + // Act + Run run = await InProcessExecution.Default.RunAsync(workflow, "Hello"); + await run.DisposeAsync(); + + // Assert + var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); + capturedActivities.Should().NotContain( + a => a.OperationName.StartsWith(ActivityNames.EdgeGroupProcess, StringComparison.Ordinal), + "EdgeGroupProcess activity should be disabled."); + capturedActivities.Should().Contain( + a => a.OperationName.StartsWith(ActivityNames.ExecutorProcess, StringComparison.Ordinal), + "Other activities should still be created."); + } + + [Fact] + public async Task DisableMessageSend_PreventsMessageSendActivityAsync() + { + // Arrange + using var testActivity = new Activity("ObservabilityTest").Start(); + var workflow = CreateWorkflowWithDisabledMessages(); + + // Act + Run run = await InProcessExecution.Default.RunAsync(workflow, "Hello"); + await run.DisposeAsync(); + + // Assert + var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); + capturedActivities.Should().NotContain( + a => a.OperationName.StartsWith(ActivityNames.MessageSend, StringComparison.Ordinal), + "MessageSend activity should be disabled."); + capturedActivities.Should().Contain( + a => a.OperationName.StartsWith(ActivityNames.ExecutorProcess, StringComparison.Ordinal), + "Other activities should still be created."); + } + + private static Workflow CreateWorkflowWithDisabledEdges() + { + Func uppercaseFunc = s => s.ToUpperInvariant(); + var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor"); + + Func reverseFunc = s => new string(s.Reverse().ToArray()); + var reverse = reverseFunc.BindAsExecutor("ReverseTextExecutor"); + + WorkflowBuilder builder = new(uppercase); + builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse); + + return builder.WithOpenTelemetry(configure: opts => opts.DisableEdgeGroupProcess = true).Build(); + } + + private static Workflow CreateWorkflowWithDisabledMessages() + { + Func uppercaseFunc = s => s.ToUpperInvariant(); + var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor"); + + Func reverseFunc = s => new string(s.Reverse().ToArray()); + var reverse = reverseFunc.BindAsExecutor("ReverseTextExecutor"); + + WorkflowBuilder builder = new(uppercase); + builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse); + + return builder.WithOpenTelemetry(configure: opts => opts.DisableMessageSend = true).Build(); + } + + [Fact] + public async Task EnableSensitiveData_LogsExecutorInputAndOutputAsync() + { + // Arrange + using var testActivity = new Activity("ObservabilityTest").Start(); + + Func uppercaseFunc = s => s.ToUpperInvariant(); + var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor"); + + // Act + WorkflowBuilder builder = new(uppercase); + builder.WithOutputFrom(uppercase); + var workflow = builder.WithOpenTelemetry(configure: opts => opts.EnableSensitiveData = true).Build(); + + Run run = await InProcessExecution.Default.RunAsync(workflow, "hello"); + await run.DisposeAsync(); + + // Assert + var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); + var executorActivity = capturedActivities.FirstOrDefault( + a => a.OperationName.StartsWith(ActivityNames.ExecutorProcess, StringComparison.Ordinal)); + + executorActivity.Should().NotBeNull("ExecutorProcess activity should be created."); + + var tags = executorActivity!.Tags.ToDictionary(t => t.Key, t => t.Value); + tags.Should().ContainKey(Tags.ExecutorInput, "Input should be logged when EnableSensitiveData is true."); + tags.Should().ContainKey(Tags.ExecutorOutput, "Output should be logged when EnableSensitiveData is true."); + tags[Tags.ExecutorInput].Should().Contain("hello", "Input should contain the input value."); + tags[Tags.ExecutorOutput].Should().Contain("HELLO", "Output should contain the transformed value."); + } + + [Fact] + public async Task EnableSensitiveData_Disabled_DoesNotLogInputOutputAsync() + { + // Arrange + using var testActivity = new Activity("ObservabilityTest").Start(); + + Func uppercaseFunc = s => s.ToUpperInvariant(); + var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor"); + + // Act - EnableSensitiveData is false by default + WorkflowBuilder builder = new(uppercase); + builder.WithOutputFrom(uppercase); + var workflow = builder.WithOpenTelemetry().Build(); + + Run run = await InProcessExecution.Default.RunAsync(workflow, "hello"); + await run.DisposeAsync(); + + // Assert + var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); + var executorActivity = capturedActivities.FirstOrDefault( + a => a.OperationName.StartsWith(ActivityNames.ExecutorProcess, StringComparison.Ordinal)); + + executorActivity.Should().NotBeNull("ExecutorProcess activity should be created."); + + var tags = executorActivity!.Tags.ToDictionary(t => t.Key, t => t.Value); + tags.Should().NotContainKey(Tags.ExecutorInput, "Input should NOT be logged when EnableSensitiveData is false."); + tags.Should().NotContainKey(Tags.ExecutorOutput, "Output should NOT be logged when EnableSensitiveData is false."); + } + + [Fact] + public async Task EnableSensitiveData_LogsMessageSendContentAsync() + { + // Arrange + using var testActivity = new Activity("ObservabilityTest").Start(); + + Func uppercaseFunc = s => s.ToUpperInvariant(); + var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor"); + + Func reverseFunc = s => new string(s.Reverse().ToArray()); + var reverse = reverseFunc.BindAsExecutor("ReverseTextExecutor"); + + // Act + WorkflowBuilder builder = new(uppercase); + builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse); + var workflow = builder.WithOpenTelemetry(configure: opts => opts.EnableSensitiveData = true).Build(); + + Run run = await InProcessExecution.Default.RunAsync(workflow, "hello"); + await run.DisposeAsync(); + + // Assert + var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); + var messageSendActivity = capturedActivities.FirstOrDefault( + a => a.OperationName.StartsWith(ActivityNames.MessageSend, StringComparison.Ordinal)); + + messageSendActivity.Should().NotBeNull("MessageSend activity should be created."); + + var tags = messageSendActivity!.Tags.ToDictionary(t => t.Key, t => t.Value); + tags.Should().ContainKey(Tags.MessageContent, "Message content should be logged when EnableSensitiveData is true."); + tags.Should().ContainKey(Tags.MessageSourceId, "Source ID should be logged."); + } + + [Fact] + public async Task EnableSensitiveData_Disabled_DoesNotLogMessageContentAsync() + { + // Arrange + using var testActivity = new Activity("ObservabilityTest").Start(); + + Func uppercaseFunc = s => s.ToUpperInvariant(); + var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor"); + + Func reverseFunc = s => new string(s.Reverse().ToArray()); + var reverse = reverseFunc.BindAsExecutor("ReverseTextExecutor"); + + // Act - EnableSensitiveData is false by default + WorkflowBuilder builder = new(uppercase); + builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse); + var workflow = builder.WithOpenTelemetry().Build(); + + Run run = await InProcessExecution.Default.RunAsync(workflow, "hello"); + await run.DisposeAsync(); + + // Assert + var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); + var messageSendActivity = capturedActivities.FirstOrDefault( + a => a.OperationName.StartsWith(ActivityNames.MessageSend, StringComparison.Ordinal)); + + messageSendActivity.Should().NotBeNull("MessageSend activity should be created."); + + var tags = messageSendActivity!.Tags.ToDictionary(t => t.Key, t => t.Value); + tags.Should().NotContainKey(Tags.MessageContent, "Message content should NOT be logged when EnableSensitiveData is false."); + tags.Should().ContainKey(Tags.MessageSourceId, "Source ID should still be logged."); + } } diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/TestRunContext.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/TestRunContext.cs index 390f99ae05..9782e68f4f 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/TestRunContext.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/TestRunContext.cs @@ -6,6 +6,7 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.Agents.AI.Workflows.Execution; +using Microsoft.Agents.AI.Workflows.Observability; namespace Microsoft.Agents.AI.Workflows.UnitTests; @@ -133,6 +134,8 @@ ValueTask IRunnerContext.AdvanceAsync(CancellationToken cancellatio public bool WithCheckpointing => false; public bool ConcurrentRunsEnabled => false; + WorkflowTelemetryContext IRunnerContext.TelemetryContext => WorkflowTelemetryContext.Disabled; + ValueTask IRunnerContext.EnsureExecutorAsync(string executorId, IStepTracer? tracer, CancellationToken cancellationToken) => new(this.Executors[executorId]);