Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dotnet/agent-framework-dotnet.slnx
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
<Folder Name="/Samples/GettingStarted/Workflows/Observability/">
<Project Path="samples/GettingStarted/Workflows/Observability/ApplicationInsights/ApplicationInsights.csproj" />
<Project Path="samples/GettingStarted/Workflows/Observability/AspireDashboard/AspireDashboard.csproj" />
<Project Path="samples/GettingStarted/Workflows/Observability/WorkflowAsAnAgent/WorkflowAsAnAgentObservability.csproj" />
</Folder>
<Folder Name="/Samples/GettingStarted/Workflows/Visualization/">
<Project Path="samples/GettingStarted/Workflows/Visualization/Visualization.csproj" Id="99bf0bc6-2440-428e-b3e7-d880e4b7a5fd" />
Expand Down
6 changes: 4 additions & 2 deletions dotnet/samples/GettingStarted/AgentOpenTelemetry/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ static async Task<string> GetWeatherAsync([Description("The location to get the
instructions: "You are a helpful assistant that provides concise and informative responses.",
tools: [AIFunctionFactory.Create(GetWeatherAsync)])
.AsBuilder()
.UseOpenTelemetry(SourceName) // enable telemetry at the agent level
.UseOpenTelemetry(SourceName, configure: (cfg) => cfg.EnableSensitiveData = true) // enable telemetry at the agent level
.Build();

var thread = agent.GetNewThread();
Expand All @@ -134,6 +134,8 @@ static async Task<string> GetWeatherAsync([Description("The location to get the

// Create a parent span for the entire agent session
using var sessionActivity = activitySource.StartActivity("Agent Session");
Console.WriteLine($"Trace ID: {sessionActivity?.TraceId} ");

var sessionId = Guid.NewGuid().ToString("N");
sessionActivity?
.SetTag("agent.name", "OpenTelemetryDemoAgent")
Expand All @@ -147,7 +149,7 @@ static async Task<string> GetWeatherAsync([Description("The location to get the

while (true)
{
Console.Write("You: ");
Console.Write("You (or 'exit' to quit): ");
var userInput = Console.ReadLine();

if (string.IsNullOrWhiteSpace(userInput) || userInput.Equals("exit", StringComparison.OrdinalIgnoreCase))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;

namespace WorkflowAsAnAgentsSample;
namespace WorkflowAsAnAgentSample;

/// <summary>
/// This sample introduces the concepts workflows as agents, where a workflow can be
Expand Down Expand Up @@ -61,9 +61,9 @@ static async Task ProcessInputAsync(AIAgent agent, AgentThread thread, string in
Dictionary<string, List<AgentRunResponseUpdate>> buffer = [];
await foreach (AgentRunResponseUpdate update in agent.RunStreamingAsync(input, thread))
{
if (update.MessageId is null)
if (update.MessageId is null || string.IsNullOrEmpty(update.Text))
{
// skip updates that don't have a message ID
// skip updates that don't have a message ID or text
continue;
}
Console.Clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;

namespace WorkflowAsAnAgentsSample;
namespace WorkflowAsAnAgentSample;

internal static class WorkflowFactory
{
Expand Down Expand Up @@ -41,44 +41,43 @@ private static ChatClientAgent GetLanguageAgent(string targetLanguage, IChatClie
/// <summary>
/// Executor that starts the concurrent processing by sending messages to the agents.
/// </summary>
private sealed class ConcurrentStartExecutor() :
Executor<List<ChatMessage>>("ConcurrentStartExecutor")
private sealed class ConcurrentStartExecutor() : Executor("ConcurrentStartExecutor")
{
/// <summary>
/// Starts the concurrent processing by sending messages to the agents.
/// </summary>
/// <param name="message">The user message to process</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
public override async ValueTask HandleAsync(List<ChatMessage> message, IWorkflowContext context, CancellationToken cancellationToken = default)
protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder)
{
// Broadcast the message to all connected agents. Receiving agents will queue
// the message but will not start processing until they receive a turn token.
await context.SendMessageAsync(message, cancellationToken: cancellationToken);
// Broadcast the turn token to kick off the agents.
await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken: cancellationToken);
return routeBuilder
.AddHandler<List<ChatMessage>>(this.RouteMessages)
.AddHandler<TurnToken>(this.RouteTurnTokenAsync);
}

private ValueTask RouteMessages(List<ChatMessage> messages, IWorkflowContext context, CancellationToken cancellationToken)
{
return context.SendMessageAsync(messages, cancellationToken: cancellationToken);
}

private ValueTask RouteTurnTokenAsync(TurnToken token, IWorkflowContext context, CancellationToken cancellationToken)
{
return context.SendMessageAsync(token, cancellationToken: cancellationToken);
}
}

/// <summary>
/// Executor that aggregates the results from the concurrent agents.
/// </summary>
private sealed class ConcurrentAggregationExecutor() :
Executor<ChatMessage>("ConcurrentAggregationExecutor")
private sealed class ConcurrentAggregationExecutor() : Executor<List<ChatMessage>>("ConcurrentAggregationExecutor")
{
private readonly List<ChatMessage> _messages = [];

/// <summary>
/// Handles incoming messages from the agents and aggregates their responses.
/// </summary>
/// <param name="message">The message from the agent</param>
/// <param name="message">The messages from the agent</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
public override async ValueTask HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
public override async ValueTask HandleAsync(List<ChatMessage> message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._messages.Add(message);
this._messages.AddRange(message);

if (this._messages.Count == 2)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,21 +97,21 @@ public override async ValueTask HandleAsync(string message, IWorkflowContext con
/// Executor that aggregates the results from the concurrent agents.
/// </summary>
internal sealed class ConcurrentAggregationExecutor() :
Executor<ChatMessage>("ConcurrentAggregationExecutor")
Executor<List<ChatMessage>>("ConcurrentAggregationExecutor")
{
private readonly List<ChatMessage> _messages = [];

/// <summary>
/// Handles incoming messages from the agents and aggregates their responses.
/// </summary>
/// <param name="message">The message from the agent</param>
/// <param name="message">The messages from the agent</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A task representing the asynchronous operation</returns>
public override async ValueTask HandleAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
public override async ValueTask HandleAsync(List<ChatMessage> message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._messages.Add(message);
this._messages.AddRange(message);

if (this._messages.Count == 2)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Diagnostics;
using Azure.AI.OpenAI;
using Azure.Identity;
using Azure.Monitor.OpenTelemetry.Exporter;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using OpenTelemetry;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;

namespace WorkflowAsAnAgentObservabilitySample;

/// <summary>
/// This sample shows how to enable OpenTelemetry observability for workflows when
/// using them as <see cref="AIAgent"/>s.
///
/// In this example, we create a workflow that uses two language agents to process
/// input concurrently, one that responds in French and another that responds in English.
///
/// You will interact with the workflow in an interactive loop, sending messages and receiving
/// streaming responses from the workflow as if it were an agent who responds in both languages.
///
/// OpenTelemetry observability is enabled at multiple levels:
/// 1. At the chat client level, capturing telemetry for interactions with the Azure OpenAI service.
/// 2. At the agent level, capturing telemetry for agent operations.
/// 3. At the workflow level, capturing telemetry for workflow execution.
///
/// Traces will be sent to an Aspire dashboard via an OTLP endpoint, and optionally to
/// Azure Monitor if an Application Insights connection string is provided.
///
/// Learn how to set up an Aspire dashboard here:
/// https://learn.microsoft.com/en-us/dotnet/aspire/fundamentals/dashboard/standalone?tabs=bash
/// </summary>
/// <remarks>
/// Pre-requisites:
/// - Foundational samples should be completed first.
/// - This sample uses concurrent processing.
/// - An Azure OpenAI endpoint and deployment name.
/// - An Application Insights resource for telemetry (optional).
/// </remarks>
public static class Program
{
private const string SourceName = "Workflow.ApplicationInsightsSample";
private static readonly ActivitySource s_activitySource = new(SourceName);

private static async Task Main()
{
// Set up observability
var applicationInsightsConnectionString = Environment.GetEnvironmentVariable("APPLICATIONINSIGHTS_CONNECTION_STRING");
var otlpEndpoint = Environment.GetEnvironmentVariable("OTLP_ENDPOINT") ?? "http://localhost:4317";

var resourceBuilder = ResourceBuilder
.CreateDefault()
.AddService("WorkflowSample");

var traceProviderBuilder = Sdk.CreateTracerProviderBuilder()
.SetResourceBuilder(resourceBuilder)
.AddSource("Microsoft.Agents.AI.*") // Agent Framework telemetry
.AddSource("Microsoft.Extensions.AI.*") // Extensions AI telemetry
.AddSource(SourceName);

traceProviderBuilder.AddOtlpExporter(options => options.Endpoint = new Uri(otlpEndpoint));
if (!string.IsNullOrWhiteSpace(applicationInsightsConnectionString))
{
traceProviderBuilder.AddAzureMonitorTraceExporter(options => options.ConnectionString = applicationInsightsConnectionString);
}

using var traceProvider = traceProviderBuilder.Build();

// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
.GetChatClient(deploymentName)
.AsIChatClient()
.AsBuilder()
.UseOpenTelemetry(sourceName: SourceName, configure: (cfg) => cfg.EnableSensitiveData = true) // enable telemetry at the chat client level
.Build();

// Start a root activity for the application
using var activity = s_activitySource.StartActivity("main");
Console.WriteLine($"Operation/Trace ID: {Activity.Current?.TraceId}");

// Create the workflow and turn it into an agent with OpenTelemetry instrumentation
var workflow = WorkflowHelper.GetWorkflow(chatClient, SourceName);
var agent = new OpenTelemetryAgent(workflow.AsAgent("workflow-agent", "Workflow Agent"), SourceName)
{
EnableSensitiveData = true // enable sensitive data at the agent level such as prompts and responses
};
var thread = agent.GetNewThread();

// Start an interactive loop to interact with the workflow as if it were an agent
while (true)
{
Console.WriteLine();
Console.Write("User (or 'exit' to quit): ");
string? input = Console.ReadLine();
if (string.IsNullOrWhiteSpace(input) || input.Equals("exit", StringComparison.OrdinalIgnoreCase))
{
break;
}

await ProcessInputAsync(agent, thread, input);
}

// Helper method to process user input and display streaming responses. To display
// multiple interleaved responses correctly, we buffer updates by message ID and
// re-render all messages on each update.
static async Task ProcessInputAsync(AIAgent agent, AgentThread thread, string input)
{
Dictionary<string, List<AgentRunResponseUpdate>> buffer = [];
await foreach (AgentRunResponseUpdate update in agent.RunStreamingAsync(input, thread))
{
if (update.MessageId is null || string.IsNullOrEmpty(update.Text))
{
// skip updates that don't have a message ID or text
continue;
}
Console.Clear();

if (!buffer.TryGetValue(update.MessageId, out List<AgentRunResponseUpdate>? value))
{
value = [];
buffer[update.MessageId] = value;
}
value.Add(update);

foreach (var (messageId, segments) in buffer)
{
string combinedText = string.Concat(segments);
Console.WriteLine($"{segments[0].AuthorName}: {combinedText}");
Console.WriteLine();
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net9.0</TargetFramework>

<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Azure.AI.OpenAI" />
<PackageReference Include="Azure.Identity" />
<PackageReference Include="Azure.Monitor.OpenTelemetry.Exporter" />
<PackageReference Include="Microsoft.Extensions.AI.OpenAI" />
<PackageReference Include="OpenTelemetry" />
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\..\..\src\Microsoft.Agents.AI.Workflows\Microsoft.Agents.AI.Workflows.csproj" />
<ProjectReference Include="..\..\..\..\..\src\Microsoft.Agents.AI.AzureAI\Microsoft.Agents.AI.AzureAI.csproj" />
<ProjectReference Include="..\..\..\..\..\src\Microsoft.Agents.AI\Microsoft.Agents.AI.csproj" />
</ItemGroup>

</Project>
Loading
Loading