diff --git a/dotnet/agent-framework-dotnet.slnx b/dotnet/agent-framework-dotnet.slnx index 5da4e166d5..5350d1afb0 100644 --- a/dotnet/agent-framework-dotnet.slnx +++ b/dotnet/agent-framework-dotnet.slnx @@ -122,6 +122,7 @@ + diff --git a/dotnet/samples/GettingStarted/AgentOpenTelemetry/Program.cs b/dotnet/samples/GettingStarted/AgentOpenTelemetry/Program.cs index 5edfcf53d9..dd5c6f9c7d 100644 --- a/dotnet/samples/GettingStarted/AgentOpenTelemetry/Program.cs +++ b/dotnet/samples/GettingStarted/AgentOpenTelemetry/Program.cs @@ -125,7 +125,7 @@ static async Task GetWeatherAsync([Description("The location to get the instructions: "You are a helpful assistant that provides concise and informative responses.", tools: [AIFunctionFactory.Create(GetWeatherAsync)]) .AsBuilder() - .UseOpenTelemetry(SourceName) // enable telemetry at the agent level + .UseOpenTelemetry(SourceName, configure: (cfg) => cfg.EnableSensitiveData = true) // enable telemetry at the agent level .Build(); var thread = agent.GetNewThread(); @@ -134,6 +134,8 @@ static async Task GetWeatherAsync([Description("The location to get the // Create a parent span for the entire agent session using var sessionActivity = activitySource.StartActivity("Agent Session"); +Console.WriteLine($"Trace ID: {sessionActivity?.TraceId} "); + var sessionId = Guid.NewGuid().ToString("N"); sessionActivity? .SetTag("agent.name", "OpenTelemetryDemoAgent") @@ -147,7 +149,7 @@ static async Task GetWeatherAsync([Description("The location to get the while (true) { - Console.Write("You: "); + Console.Write("You (or 'exit' to quit): "); var userInput = Console.ReadLine(); if (string.IsNullOrWhiteSpace(userInput) || userInput.Equals("exit", StringComparison.OrdinalIgnoreCase)) diff --git a/dotnet/samples/GettingStarted/Workflows/Agents/WorkflowAsAnAgent/Program.cs b/dotnet/samples/GettingStarted/Workflows/Agents/WorkflowAsAnAgent/Program.cs index 75f0beb2da..6aa65d56b5 100644 --- a/dotnet/samples/GettingStarted/Workflows/Agents/WorkflowAsAnAgent/Program.cs +++ b/dotnet/samples/GettingStarted/Workflows/Agents/WorkflowAsAnAgent/Program.cs @@ -6,7 +6,7 @@ using Microsoft.Agents.AI.Workflows; using Microsoft.Extensions.AI; -namespace WorkflowAsAnAgentsSample; +namespace WorkflowAsAnAgentSample; /// /// This sample introduces the concepts workflows as agents, where a workflow can be @@ -61,9 +61,9 @@ static async Task ProcessInputAsync(AIAgent agent, AgentThread thread, string in Dictionary> buffer = []; await foreach (AgentRunResponseUpdate update in agent.RunStreamingAsync(input, thread)) { - if (update.MessageId is null) + if (update.MessageId is null || string.IsNullOrEmpty(update.Text)) { - // skip updates that don't have a message ID + // skip updates that don't have a message ID or text continue; } Console.Clear(); diff --git a/dotnet/samples/GettingStarted/Workflows/Agents/WorkflowAsAnAgent/WorkflowFactory.cs b/dotnet/samples/GettingStarted/Workflows/Agents/WorkflowAsAnAgent/WorkflowFactory.cs index 7d8768c226..736c51d555 100644 --- a/dotnet/samples/GettingStarted/Workflows/Agents/WorkflowAsAnAgent/WorkflowFactory.cs +++ b/dotnet/samples/GettingStarted/Workflows/Agents/WorkflowAsAnAgent/WorkflowFactory.cs @@ -4,7 +4,7 @@ using Microsoft.Agents.AI.Workflows; using Microsoft.Extensions.AI; -namespace WorkflowAsAnAgentsSample; +namespace WorkflowAsAnAgentSample; internal static class WorkflowFactory { @@ -41,44 +41,43 @@ private static ChatClientAgent GetLanguageAgent(string targetLanguage, IChatClie /// /// Executor that starts the concurrent processing by sending messages to the agents. /// - private sealed class ConcurrentStartExecutor() : - Executor>("ConcurrentStartExecutor") + private sealed class ConcurrentStartExecutor() : Executor("ConcurrentStartExecutor") { - /// - /// Starts the concurrent processing by sending messages to the agents. - /// - /// The user message to process - /// Workflow context for accessing workflow services and adding events - /// The to monitor for cancellation requests. - /// The default is . - public override async ValueTask HandleAsync(List message, IWorkflowContext context, CancellationToken cancellationToken = default) + protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder) { - // Broadcast the message to all connected agents. Receiving agents will queue - // the message but will not start processing until they receive a turn token. - await context.SendMessageAsync(message, cancellationToken: cancellationToken); - // Broadcast the turn token to kick off the agents. - await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken: cancellationToken); + return routeBuilder + .AddHandler>(this.RouteMessages) + .AddHandler(this.RouteTurnTokenAsync); + } + + private ValueTask RouteMessages(List messages, IWorkflowContext context, CancellationToken cancellationToken) + { + return context.SendMessageAsync(messages, cancellationToken: cancellationToken); + } + + private ValueTask RouteTurnTokenAsync(TurnToken token, IWorkflowContext context, CancellationToken cancellationToken) + { + return context.SendMessageAsync(token, cancellationToken: cancellationToken); } } /// /// Executor that aggregates the results from the concurrent agents. /// - private sealed class ConcurrentAggregationExecutor() : - Executor("ConcurrentAggregationExecutor") + private sealed class ConcurrentAggregationExecutor() : Executor>("ConcurrentAggregationExecutor") { private readonly List _messages = []; /// /// Handles incoming messages from the agents and aggregates their responses. /// - /// The message from the agent + /// The messages from the agent /// Workflow context for accessing workflow services and adding events /// The to monitor for cancellation requests. /// The default is . - public override async ValueTask HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default) + public override async ValueTask HandleAsync(List message, IWorkflowContext context, CancellationToken cancellationToken = default) { - this._messages.Add(message); + this._messages.AddRange(message); if (this._messages.Count == 2) { diff --git a/dotnet/samples/GettingStarted/Workflows/Concurrent/Concurrent/Program.cs b/dotnet/samples/GettingStarted/Workflows/Concurrent/Concurrent/Program.cs index dc91338987..e1f71e2311 100644 --- a/dotnet/samples/GettingStarted/Workflows/Concurrent/Concurrent/Program.cs +++ b/dotnet/samples/GettingStarted/Workflows/Concurrent/Concurrent/Program.cs @@ -97,21 +97,21 @@ public override async ValueTask HandleAsync(string message, IWorkflowContext con /// Executor that aggregates the results from the concurrent agents. /// internal sealed class ConcurrentAggregationExecutor() : - Executor("ConcurrentAggregationExecutor") + Executor>("ConcurrentAggregationExecutor") { private readonly List _messages = []; /// /// Handles incoming messages from the agents and aggregates their responses. /// - /// The message from the agent + /// The messages from the agent /// Workflow context for accessing workflow services and adding events /// The to monitor for cancellation requests. /// The default is . /// A task representing the asynchronous operation - public override async ValueTask HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default) + public override async ValueTask HandleAsync(List message, IWorkflowContext context, CancellationToken cancellationToken = default) { - this._messages.Add(message); + this._messages.AddRange(message); if (this._messages.Count == 2) { diff --git a/dotnet/samples/GettingStarted/Workflows/Observability/WorkflowAsAnAgent/Program.cs b/dotnet/samples/GettingStarted/Workflows/Observability/WorkflowAsAnAgent/Program.cs new file mode 100644 index 0000000000..17d7d03b3f --- /dev/null +++ b/dotnet/samples/GettingStarted/Workflows/Observability/WorkflowAsAnAgent/Program.cs @@ -0,0 +1,140 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Diagnostics; +using Azure.AI.OpenAI; +using Azure.Identity; +using Azure.Monitor.OpenTelemetry.Exporter; +using Microsoft.Agents.AI; +using Microsoft.Agents.AI.Workflows; +using Microsoft.Extensions.AI; +using OpenTelemetry; +using OpenTelemetry.Resources; +using OpenTelemetry.Trace; + +namespace WorkflowAsAnAgentObservabilitySample; + +/// +/// This sample shows how to enable OpenTelemetry observability for workflows when +/// using them as s. +/// +/// In this example, we create a workflow that uses two language agents to process +/// input concurrently, one that responds in French and another that responds in English. +/// +/// You will interact with the workflow in an interactive loop, sending messages and receiving +/// streaming responses from the workflow as if it were an agent who responds in both languages. +/// +/// OpenTelemetry observability is enabled at multiple levels: +/// 1. At the chat client level, capturing telemetry for interactions with the Azure OpenAI service. +/// 2. At the agent level, capturing telemetry for agent operations. +/// 3. At the workflow level, capturing telemetry for workflow execution. +/// +/// Traces will be sent to an Aspire dashboard via an OTLP endpoint, and optionally to +/// Azure Monitor if an Application Insights connection string is provided. +/// +/// Learn how to set up an Aspire dashboard here: +/// https://learn.microsoft.com/en-us/dotnet/aspire/fundamentals/dashboard/standalone?tabs=bash +/// +/// +/// Pre-requisites: +/// - Foundational samples should be completed first. +/// - This sample uses concurrent processing. +/// - An Azure OpenAI endpoint and deployment name. +/// - An Application Insights resource for telemetry (optional). +/// +public static class Program +{ + private const string SourceName = "Workflow.ApplicationInsightsSample"; + private static readonly ActivitySource s_activitySource = new(SourceName); + + private static async Task Main() + { + // Set up observability + var applicationInsightsConnectionString = Environment.GetEnvironmentVariable("APPLICATIONINSIGHTS_CONNECTION_STRING"); + var otlpEndpoint = Environment.GetEnvironmentVariable("OTLP_ENDPOINT") ?? "http://localhost:4317"; + + var resourceBuilder = ResourceBuilder + .CreateDefault() + .AddService("WorkflowSample"); + + var traceProviderBuilder = Sdk.CreateTracerProviderBuilder() + .SetResourceBuilder(resourceBuilder) + .AddSource("Microsoft.Agents.AI.*") // Agent Framework telemetry + .AddSource("Microsoft.Extensions.AI.*") // Extensions AI telemetry + .AddSource(SourceName); + + traceProviderBuilder.AddOtlpExporter(options => options.Endpoint = new Uri(otlpEndpoint)); + if (!string.IsNullOrWhiteSpace(applicationInsightsConnectionString)) + { + traceProviderBuilder.AddAzureMonitorTraceExporter(options => options.ConnectionString = applicationInsightsConnectionString); + } + + using var traceProvider = traceProviderBuilder.Build(); + + // Set up the Azure OpenAI client + var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set."); + var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini"; + var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential()) + .GetChatClient(deploymentName) + .AsIChatClient() + .AsBuilder() + .UseOpenTelemetry(sourceName: SourceName, configure: (cfg) => cfg.EnableSensitiveData = true) // enable telemetry at the chat client level + .Build(); + + // Start a root activity for the application + using var activity = s_activitySource.StartActivity("main"); + Console.WriteLine($"Operation/Trace ID: {Activity.Current?.TraceId}"); + + // Create the workflow and turn it into an agent with OpenTelemetry instrumentation + var workflow = WorkflowHelper.GetWorkflow(chatClient, SourceName); + var agent = new OpenTelemetryAgent(workflow.AsAgent("workflow-agent", "Workflow Agent"), SourceName) + { + EnableSensitiveData = true // enable sensitive data at the agent level such as prompts and responses + }; + var thread = agent.GetNewThread(); + + // Start an interactive loop to interact with the workflow as if it were an agent + while (true) + { + Console.WriteLine(); + Console.Write("User (or 'exit' to quit): "); + string? input = Console.ReadLine(); + if (string.IsNullOrWhiteSpace(input) || input.Equals("exit", StringComparison.OrdinalIgnoreCase)) + { + break; + } + + await ProcessInputAsync(agent, thread, input); + } + + // Helper method to process user input and display streaming responses. To display + // multiple interleaved responses correctly, we buffer updates by message ID and + // re-render all messages on each update. + static async Task ProcessInputAsync(AIAgent agent, AgentThread thread, string input) + { + Dictionary> buffer = []; + await foreach (AgentRunResponseUpdate update in agent.RunStreamingAsync(input, thread)) + { + if (update.MessageId is null || string.IsNullOrEmpty(update.Text)) + { + // skip updates that don't have a message ID or text + continue; + } + Console.Clear(); + + if (!buffer.TryGetValue(update.MessageId, out List? value)) + { + value = []; + buffer[update.MessageId] = value; + } + value.Add(update); + + foreach (var (messageId, segments) in buffer) + { + string combinedText = string.Concat(segments); + Console.WriteLine($"{segments[0].AuthorName}: {combinedText}"); + Console.WriteLine(); + } + } + } + } +} diff --git a/dotnet/samples/GettingStarted/Workflows/Observability/WorkflowAsAnAgent/WorkflowAsAnAgentObservability.csproj b/dotnet/samples/GettingStarted/Workflows/Observability/WorkflowAsAnAgent/WorkflowAsAnAgentObservability.csproj new file mode 100644 index 0000000000..17c44eeb75 --- /dev/null +++ b/dotnet/samples/GettingStarted/Workflows/Observability/WorkflowAsAnAgent/WorkflowAsAnAgentObservability.csproj @@ -0,0 +1,27 @@ + + + + Exe + net9.0 + + enable + enable + + + + + + + + + + + + + + + + + + + diff --git a/dotnet/samples/GettingStarted/Workflows/Observability/WorkflowAsAnAgent/WorkflowHelper.cs b/dotnet/samples/GettingStarted/Workflows/Observability/WorkflowAsAnAgent/WorkflowHelper.cs new file mode 100644 index 0000000000..816dce50d0 --- /dev/null +++ b/dotnet/samples/GettingStarted/Workflows/Observability/WorkflowAsAnAgent/WorkflowHelper.cs @@ -0,0 +1,98 @@ +// Copyright (c) Microsoft. All rights reserved. + +using Microsoft.Agents.AI; +using Microsoft.Agents.AI.Workflows; +using Microsoft.Extensions.AI; + +namespace WorkflowAsAnAgentObservabilitySample; + +internal static class WorkflowHelper +{ + /// + /// Creates a workflow that uses two language agents to process input concurrently. + /// + /// The chat client to use for the agents + /// The source name for OpenTelemetry instrumentation + /// A workflow that processes input using two language agents + internal static Workflow GetWorkflow(IChatClient chatClient, string sourceName) + { + // Create executors + var startExecutor = new ConcurrentStartExecutor(); + var aggregationExecutor = new ConcurrentAggregationExecutor(); + AIAgent frenchAgent = GetLanguageAgent("French", chatClient, sourceName); + AIAgent englishAgent = GetLanguageAgent("English", chatClient, sourceName); + + // Build the workflow by adding executors and connecting them + return new WorkflowBuilder(startExecutor) + .AddFanOutEdge(startExecutor, targets: [frenchAgent, englishAgent]) + .AddFanInEdge(aggregationExecutor, sources: [frenchAgent, englishAgent]) + .WithOutputFrom(aggregationExecutor) + .Build(); + } + + /// + /// Creates a language agent for the specified target language. + /// + /// The target language for translation + /// The chat client to use for the agent + /// The source name for OpenTelemetry instrumentation + /// An AIAgent configured for the specified language + private static AIAgent GetLanguageAgent(string targetLanguage, IChatClient chatClient, string sourceName) => + new ChatClientAgent( + chatClient, + instructions: $"You're a helpful assistant who always responds in {targetLanguage}.", + name: $"{targetLanguage}Agent" + ) + .AsBuilder() + .UseOpenTelemetry(sourceName, configure: (cfg) => cfg.EnableSensitiveData = true) // enable telemetry at the agent level + .Build(); + + /// + /// Executor that starts the concurrent processing by sending messages to the agents. + /// + private sealed class ConcurrentStartExecutor() : Executor("ConcurrentStartExecutor") + { + protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder) + { + return routeBuilder + .AddHandler>(this.RouteMessages) + .AddHandler(this.RouteTurnTokenAsync); + } + + private ValueTask RouteMessages(List messages, IWorkflowContext context, CancellationToken cancellationToken) + { + return context.SendMessageAsync(messages, cancellationToken: cancellationToken); + } + + private ValueTask RouteTurnTokenAsync(TurnToken token, IWorkflowContext context, CancellationToken cancellationToken) + { + return context.SendMessageAsync(token, cancellationToken: cancellationToken); + } + } + + /// + /// Executor that aggregates the results from the concurrent agents. + /// + private sealed class ConcurrentAggregationExecutor() : Executor>("ConcurrentAggregationExecutor") + { + private readonly List _messages = []; + + /// + /// Handles incoming messages from the agents and aggregates their responses. + /// + /// The message from the agent + /// Workflow context for accessing workflow services and adding events + /// The to monitor for cancellation requests. + /// The default is . + public override async ValueTask HandleAsync(List message, IWorkflowContext context, CancellationToken cancellationToken = default) + { + this._messages.AddRange(message); + + if (this._messages.Count == 2) + { + var formattedMessages = string.Join(Environment.NewLine, this._messages.Select(m => $"{m.Text}")); + await context.YieldOutputAsync(formattedMessages, cancellationToken); + } + } + } +}