Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using Azure.AI.Projects.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;

string endpoint = Environment.GetEnvironmentVariable("AZURE_FOUNDRY_PROJECT_ENDPOINT") ?? throw new InvalidOperationException("AZURE_FOUNDRY_PROJECT_ENDPOINT is not set.");
string deploymentName = Environment.GetEnvironmentVariable("AZURE_FOUNDRY_PROJECT_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ private static async Task Main()

using var traceProvider = Sdk.CreateTracerProviderBuilder()
.SetResourceBuilder(resourceBuilder)
.AddSource("Microsoft.Agents.AI.Workflows*")
.AddSource(SourceName)
// The following source is only required if not specifying
// the `activitySource` in the WithOpenTelemetry call below
.AddSource("Microsoft.Agents.AI.Workflows*")
.AddAzureMonitorTraceExporter(options => options.ConnectionString = applicationInsightsConnectionString)
.Build();

Expand All @@ -51,6 +53,10 @@ private static async Task Main()
// Build the workflow by connecting executors sequentially
var workflow = new WorkflowBuilder(uppercase)
.AddEdge(uppercase, reverse)
.WithOpenTelemetry(
// Set `EnableSensitiveData` to true to include message content in traces
configure: cfg => cfg.EnableSensitiveData = true,
activitySource: s_activitySource)
.Build();

// Execute the workflow with input data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ private static async Task Main()

using var traceProvider = Sdk.CreateTracerProviderBuilder()
.SetResourceBuilder(resourceBuilder)
.AddSource("Microsoft.Agents.AI.Workflows*")
.AddSource(SourceName)
// The following source is only required if not specifying
// the `activitySource` in the WithOpenTelemetry call below
.AddSource("Microsoft.Agents.AI.Workflows*")
.AddOtlpExporter(options => options.Endpoint = new Uri(otlpEndpoint))
.Build();

Expand All @@ -53,6 +55,10 @@ private static async Task Main()
// Build the workflow by connecting executors sequentially
var workflow = new WorkflowBuilder(uppercase)
.AddEdge(uppercase, reverse)
.WithOpenTelemetry(
// Set `EnableSensitiveData` to true to include message content in traces
configure: cfg => cfg.EnableSensitiveData = true,
activitySource: s_activitySource)
.Build();

// Execute the workflow with input data
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
// Copyright (c) Microsoft. All rights reserved.

using System;
using System.Diagnostics;
using Microsoft.Agents.AI.Workflows.Observability;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
Expand Down Expand Up @@ -41,4 +44,23 @@ public sealed class DeclarativeWorkflowOptions(WorkflowAgentProvider agentProvid
/// Gets the <see cref="ILoggerFactory"/> used to create loggers for workflow components.
/// </summary>
public ILoggerFactory LoggerFactory { get; init; } = NullLoggerFactory.Instance;

/// <summary>
/// Gets the callback to configure telemetry options.
/// </summary>
public Action<WorkflowTelemetryOptions>? ConfigureTelemetry { get; init; }

/// <summary>
/// Gets an optional <see cref="ActivitySource"/> for telemetry.
/// If provided, the caller retains ownership and is responsible for disposal.
/// If <see langword="null"/> but <see cref="ConfigureTelemetry"/> is set, a shared default
/// activity source named "Microsoft.Agents.AI.Workflows" will be used.
/// </summary>
public ActivitySource? TelemetryActivitySource { get; init; }

/// <summary>
/// Gets a value indicating whether telemetry is enabled.
/// Telemetry is enabled when either <see cref="ConfigureTelemetry"/> or <see cref="TelemetryActivitySource"/> is set.
/// </summary>
internal bool IsTelemetryEnabled => this.ConfigureTelemetry is not null || this.TelemetryActivitySource is not null;
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ public Workflow Complete()

this._workflowModel.Build(builder);

// Apply telemetry if configured
if (this._workflowOptions.IsTelemetryEnabled)
{
builder.WorkflowBuilder.WithOpenTelemetry(
this._workflowOptions.ConfigureTelemetry,
this._workflowOptions.TelemetryActivitySource);
}

// Build final workflow
return builder.WorkflowBuilder.Build(validateOrphans: false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ private async ValueTask<Executor> FindRouterAsync(IStepTracer? tracer) => await

protected internal override async ValueTask<DeliveryMapping?> ChaseEdgeAsync(MessageEnvelope envelope, IStepTracer? stepTracer)
{
using var activity = s_activitySource.StartActivity(ActivityNames.EdgeGroupProcess);
using var activity = this.StartActivity();
activity?
.SetTag(Tags.EdgeGroupType, nameof(DirectEdgeRunner))
.SetTag(Tags.MessageSourceId, this.EdgeData.SourceId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ internal interface IStatefulEdgeRunner

internal abstract class EdgeRunner
{
protected static readonly string s_namespace = typeof(EdgeRunner).Namespace!;
protected static readonly ActivitySource s_activitySource = new(s_namespace);

// TODO: Can this be sync?
protected internal abstract ValueTask<DeliveryMapping?> ChaseEdgeAsync(MessageEnvelope envelope, IStepTracer? stepTracer);
}
Expand All @@ -26,4 +23,6 @@ internal abstract class EdgeRunner<TEdgeData>(
{
protected IRunnerContext RunContext { get; } = Throw.IfNull(runContext);
protected TEdgeData EdgeData { get; } = Throw.IfNull(edgeData);

protected Activity? StartActivity() => this.RunContext.TelemetryContext.StartEdgeGroupProcessActivity();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ internal sealed class FanInEdgeRunner(IRunnerContext runContext, FanInEdgeData e
{
Debug.Assert(!envelope.IsExternal, "FanIn edges should never be chased from external input");

using var activity = s_activitySource.StartActivity(ActivityNames.EdgeGroupProcess);
using var activity = this.StartActivity();
activity?
.SetTag(Tags.EdgeGroupType, nameof(FanInEdgeRunner))
.SetTag(Tags.MessageTargetId, this.EdgeData.SinkId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ internal sealed class FanOutEdgeRunner(IRunnerContext runContext, FanOutEdgeData
{
protected internal override async ValueTask<DeliveryMapping?> ChaseEdgeAsync(MessageEnvelope envelope, IStepTracer? stepTracer)
{
using var activity = s_activitySource.StartActivity(ActivityNames.EdgeGroupProcess);
using var activity = this.StartActivity();
activity?
.SetTag(Tags.EdgeGroupType, nameof(FanOutEdgeRunner))
.SetTag(Tags.MessageSourceId, this.EdgeData.SourceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows.Observability;

namespace Microsoft.Agents.AI.Workflows.Execution;

internal interface IRunnerContext : IExternalRequestSink, ISuperStepJoinContext
{
WorkflowTelemetryContext TelemetryContext { get; }

ValueTask AddEventAsync(WorkflowEvent workflowEvent, CancellationToken cancellationToken = default);
ValueTask SendMessageAsync(string sourceId, object message, string? targetId = null, CancellationToken cancellationToken = default);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows.Observability;

namespace Microsoft.Agents.AI.Workflows.Execution;

Expand All @@ -12,6 +13,8 @@ internal interface ISuperStepRunner

string StartExecutorId { get; }

WorkflowTelemetryContext TelemetryContext { get; }

bool HasUnservicedRequests { get; }
bool HasUnprocessedMessages { get; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ namespace Microsoft.Agents.AI.Workflows.Execution;

internal sealed class LockstepRunEventStream : IRunEventStream
{
private static readonly string s_namespace = typeof(LockstepRunEventStream).Namespace!;
private static readonly ActivitySource s_activitySource = new(s_namespace);

private readonly CancellationTokenSource _stopCancellation = new();
private readonly InputWaiter _inputWaiter = new();
private int _isDisposed;
Expand Down Expand Up @@ -53,7 +50,7 @@ public async IAsyncEnumerable<WorkflowEvent> TakeEventStreamAsync(bool blockOnPe

this._stepRunner.OutgoingEvents.EventRaised += OnWorkflowEventAsync;

using Activity? activity = s_activitySource.StartActivity(ActivityNames.WorkflowRun);
using Activity? activity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity();
activity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId).SetTag(Tags.RunId, this._stepRunner.RunId);

try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public static ResponseEdgeRunner ForPort(IRunnerContext runContext, string execu
{
Debug.Assert(envelope.IsExternal, "Input edges should only be chased from external input");

using var activity = s_activitySource.StartActivity(ActivityNames.EdgeGroupProcess);
using var activity = this.StartActivity();
activity?
.SetTag(Tags.EdgeGroupType, nameof(ResponseEdgeRunner))
.SetTag(Tags.MessageSourceId, envelope.SourceId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ namespace Microsoft.Agents.AI.Workflows.Execution;
/// </summary>
internal sealed class StreamingRunEventStream : IRunEventStream
{
private static readonly string s_namespace = typeof(StreamingRunEventStream).Namespace!;
private static readonly ActivitySource s_activitySource = new(s_namespace);

private readonly Channel<WorkflowEvent> _eventChannel;
private readonly ISuperStepRunner _stepRunner;
private readonly InputWaiter _inputWaiter;
Expand Down Expand Up @@ -63,7 +60,7 @@ private async Task RunLoopAsync(CancellationToken cancellationToken)
// Subscribe to events - they will flow directly to the channel as they're raised
this._stepRunner.OutgoingEvents.EventRaised += OnEventRaisedAsync;

using Activity? activity = s_activitySource.StartActivity(ActivityNames.WorkflowRun);
using Activity? activity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity();
activity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId).SetTag(Tags.RunId, this._stepRunner.RunId);

try
Expand Down
20 changes: 11 additions & 9 deletions dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ public abstract class Executor : IIdentified
/// </summary>
public string Id { get; }

private static readonly string s_namespace = typeof(Executor).Namespace!;
private static readonly ActivitySource s_activitySource = new(s_namespace);

// TODO: Add overloads for binding with a configuration/options object once the Configured<T> hierarchy goes away.

/// <summary>
Expand Down Expand Up @@ -142,13 +139,13 @@ private set
/// <returns>A ValueTask representing the asynchronous operation, wrapping the output from the executor.</returns>
/// <exception cref="NotSupportedException">No handler found for the message type.</exception>
/// <exception cref="TargetInvocationException">An exception is generated while handling the message.</exception>
public async ValueTask<object?> ExecuteAsync(object message, TypeId messageType, IWorkflowContext context, CancellationToken cancellationToken = default)
public ValueTask<object?> ExecuteAsync(object message, TypeId messageType, IWorkflowContext context, CancellationToken cancellationToken = default)
=> this.ExecuteAsync(message, messageType, context, WorkflowTelemetryContext.Disabled, cancellationToken);

internal async ValueTask<object?> ExecuteAsync(object message, TypeId messageType, IWorkflowContext context, WorkflowTelemetryContext telemetryContext, CancellationToken cancellationToken = default)
{
using var activity = s_activitySource.StartActivity(ActivityNames.ExecutorProcess, ActivityKind.Internal);
activity?.SetTag(Tags.ExecutorId, this.Id)
.SetTag(Tags.ExecutorType, this.GetType().FullName)
.SetTag(Tags.MessageType, messageType.TypeName)
.CreateSourceLinks(context.TraceContext);
using var activity = telemetryContext.StartExecutorProcessActivity(this.Id, this.GetType().FullName, messageType.TypeName, message);
activity?.CreateSourceLinks(context.TraceContext);

await context.AddEventAsync(new ExecutorInvokedEvent(this.Id, message), cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -183,6 +180,11 @@ private set
return null; // Void result.
}

// Output is not available if executor does not return anything, in which case
// messages sent in the handlers of this executor will be set in the message
// send activities.
telemetryContext.SetExecutorOutput(activity, result.Result);

// If we had a real return type, raise it as a SendMessage; TODO: Should we have a way to disable this behaviour?
if (result.Result is not null && this.Options.AutoSendMessageHandlerResultObject)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows.Checkpointing;
using Microsoft.Agents.AI.Workflows.Execution;
using Microsoft.Agents.AI.Workflows.Observability;
using Microsoft.Shared.Diagnostics;

namespace Microsoft.Agents.AI.Workflows.InProc;
Expand Down Expand Up @@ -70,6 +71,9 @@ private InProcessRunner(Workflow workflow, ICheckpointManager? checkpointManager
/// <inheritdoc cref="ISuperStepRunner.StartExecutorId"/>
public string StartExecutorId { get; }

/// <inheritdoc cref="ISuperStepRunner.TelemetryContext"/>
public WorkflowTelemetryContext TelemetryContext => this.Workflow.TelemetryContext;

private readonly HashSet<Type> _knownValidInputTypes;
public async ValueTask<bool> IsValidInputTypeAsync(Type messageType, CancellationToken cancellationToken = default)
{
Expand Down Expand Up @@ -201,6 +205,7 @@ await executor.ExecuteAsync(
envelope.Message,
envelope.MessageType,
this.RunContext.BindWorkflowContext(receiverId, envelope.TraceContext),
this.TelemetryContext,
cancellationToken
).ConfigureAwait(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public InProcessRunnerContext(
this.ConcurrentRunsEnabled = enableConcurrentRuns;
this.OutgoingEvents = outgoingEvents;
}
public WorkflowTelemetryContext TelemetryContext => this._workflow.TelemetryContext;

public IExternalRequestSink RegisterPort(string executorId, RequestPort port)
{
Expand Down Expand Up @@ -195,12 +196,10 @@ public ValueTask AddEventAsync(WorkflowEvent workflowEvent, CancellationToken ca
return this.OutgoingEvents.EnqueueAsync(workflowEvent);
}

private static readonly string s_namespace = typeof(IWorkflowContext).Namespace!;
private static readonly ActivitySource s_activitySource = new(s_namespace);

public async ValueTask SendMessageAsync(string sourceId, object message, string? targetId = null, CancellationToken cancellationToken = default)
{
using Activity? activity = s_activitySource.StartActivity(ActivityNames.MessageSend, ActivityKind.Producer);
using Activity? activity = this._workflow.TelemetryContext.StartMessageSendActivity(sourceId, targetId, message);

// Create a carrier for trace context propagation
var traceContext = activity is null ? null : new Dictionary<string, string>();
if (traceContext is not null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace Microsoft.Agents.AI.Workflows.Observability;
internal static class ActivityNames
{
public const string WorkflowBuild = "workflow.build";
public const string WorkflowRun = "workflow.run";
public const string WorkflowRun = "workflow_invoke";
public const string MessageSend = "message.send";
public const string ExecutorProcess = "executor.process";
public const string EdgeGroupProcess = "edge_group.process";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ internal static class Tags
public const string RunId = "run.id";
public const string ExecutorId = "executor.id";
public const string ExecutorType = "executor.type";
public const string ExecutorInput = "executor.input";
public const string ExecutorOutput = "executor.output";
public const string MessageType = "message.type";
public const string MessageContent = "message.content";
public const string EdgeGroupType = "edge_group.type";
public const string MessageSourceId = "message.source_id";
public const string MessageTargetId = "message.target_id";
Expand Down
Loading
Loading