Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,10 @@ namespace Microsoft.Agents.AI.Workflows;
/// <param name="e">
/// Optionally, the <see cref="Exception"/> representing the error.
/// </param>
public class WorkflowErrorEvent(Exception? e) : WorkflowEvent(e);
public class WorkflowErrorEvent(Exception? e) : WorkflowEvent(e)
{
/// <summary>
/// Gets the exception that caused the current operation to fail, if one occurred.
/// </summary>
public Exception? Exception => this.Data as Exception;
}
8 changes: 5 additions & 3 deletions dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowHostAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@ internal sealed class WorkflowHostAgent : AIAgent
private readonly string? _id;
private readonly CheckpointManager? _checkpointManager;
private readonly IWorkflowExecutionEnvironment _executionEnvironment;
private readonly bool _includeExceptionDetails;
private readonly Task<ProtocolDescriptor> _describeTask;

private readonly ConcurrentDictionary<string, string> _assignedRunIds = [];

public WorkflowHostAgent(Workflow workflow, string? id = null, string? name = null, string? description = null, CheckpointManager? checkpointManager = null, IWorkflowExecutionEnvironment? executionEnvironment = null)
public WorkflowHostAgent(Workflow workflow, string? id = null, string? name = null, string? description = null, CheckpointManager? checkpointManager = null, IWorkflowExecutionEnvironment? executionEnvironment = null, bool includeExceptionDetails = false)
{
this._workflow = Throw.IfNull(workflow);

this._executionEnvironment = executionEnvironment ?? (workflow.AllowConcurrent
? InProcessExecution.Concurrent
: InProcessExecution.OffThread);
this._checkpointManager = checkpointManager;
this._includeExceptionDetails = includeExceptionDetails;

this._id = id;
this.Name = name;
Expand Down Expand Up @@ -61,10 +63,10 @@ private async ValueTask ValidateWorkflowAsync()
protocol.ThrowIfNotChatProtocol();
}

public override AgentThread GetNewThread() => new WorkflowThread(this._workflow, this.GenerateNewId(), this._executionEnvironment, this._checkpointManager);
public override AgentThread GetNewThread() => new WorkflowThread(this._workflow, this.GenerateNewId(), this._executionEnvironment, this._checkpointManager, this._includeExceptionDetails);

public override AgentThread DeserializeThread(JsonElement serializedThread, JsonSerializerOptions? jsonSerializerOptions = null)
=> new WorkflowThread(this._workflow, serializedThread, this._executionEnvironment, this._checkpointManager, jsonSerializerOptions);
=> new WorkflowThread(this._workflow, serializedThread, this._executionEnvironment, this._checkpointManager, this._includeExceptionDetails, jsonSerializerOptions);

private ValueTask<WorkflowThread> UpdateThreadAsync(IEnumerable<ChatMessage> messages, AgentThread? thread = null, CancellationToken cancellationToken = default)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,19 @@ public static class WorkflowHostingExtensions
/// <param name="executionEnvironment">Specify the execution environment to use when running the workflows. See
/// <see cref="InProcessExecution.OffThread"/>, <see cref="InProcessExecution.Concurrent"/> and
/// <see cref="InProcessExecution.Lockstep"/> for the in-process environments.</param>
/// <param name="includeExceptionDetails">If <see langword="true"/>, will include <see cref="System.Exception.Message"/>
/// in the <see cref="ErrorContent"/> representing the workflow error.</param>
/// <returns></returns>
public static AIAgent AsAgent(
this Workflow workflow,
string? id = null,
string? name = null,
string? description = null,
CheckpointManager? checkpointManager = null,
IWorkflowExecutionEnvironment? executionEnvironment = null)
IWorkflowExecutionEnvironment? executionEnvironment = null,
bool includeExceptionDetails = false)
{
return new WorkflowHostAgent(workflow, id, name, description, checkpointManager, executionEnvironment);
return new WorkflowHostAgent(workflow, id, name, description, checkpointManager, executionEnvironment, includeExceptionDetails);
}

internal static FunctionCallContent ToFunctionCall(this ExternalRequest request)
Expand Down
33 changes: 28 additions & 5 deletions dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Text.Json;
using System.Threading;
Expand All @@ -17,14 +18,16 @@ internal sealed class WorkflowThread : AgentThread
{
private readonly Workflow _workflow;
private readonly IWorkflowExecutionEnvironment _executionEnvironment;
private readonly bool _includeExceptionDetails;

private readonly CheckpointManager _checkpointManager;
private readonly InMemoryCheckpointManager? _inMemoryCheckpointManager;

public WorkflowThread(Workflow workflow, string runId, IWorkflowExecutionEnvironment executionEnvironment, CheckpointManager? checkpointManager = null)
public WorkflowThread(Workflow workflow, string runId, IWorkflowExecutionEnvironment executionEnvironment, CheckpointManager? checkpointManager = null, bool includeExceptionDetails = false)
{
this._workflow = Throw.IfNull(workflow);
this._executionEnvironment = Throw.IfNull(executionEnvironment);
this._includeExceptionDetails = includeExceptionDetails;

// If the user provided an external checkpoint manager, use that, otherwise rely on an in-memory one.
// TODO: Implement persist-only-last functionality for in-memory checkpoint manager, to avoid unbounded
Expand All @@ -35,7 +38,7 @@ public WorkflowThread(Workflow workflow, string runId, IWorkflowExecutionEnviron
this.MessageStore = new WorkflowMessageStore();
}

public WorkflowThread(Workflow workflow, JsonElement serializedThread, IWorkflowExecutionEnvironment executionEnvironment, CheckpointManager? checkpointManager = null, JsonSerializerOptions? jsonSerializerOptions = null)
public WorkflowThread(Workflow workflow, JsonElement serializedThread, IWorkflowExecutionEnvironment executionEnvironment, CheckpointManager? checkpointManager = null, bool includeExceptionDetails = false, JsonSerializerOptions? jsonSerializerOptions = null)
{
this._workflow = Throw.IfNull(workflow);
this._executionEnvironment = Throw.IfNull(executionEnvironment);
Expand Down Expand Up @@ -80,7 +83,7 @@ public override JsonElement Serialize(JsonSerializerOptions? jsonSerializerOptio
return marshaller.Marshal(info);
}

public AgentRunResponseUpdate CreateUpdate(string responseId, params AIContent[] parts)
public AgentRunResponseUpdate CreateUpdate(string responseId, object raw, params AIContent[] parts)
{
Throw.IfNullOrEmpty(parts);

Expand All @@ -89,7 +92,8 @@ public AgentRunResponseUpdate CreateUpdate(string responseId, params AIContent[]
CreatedAt = DateTimeOffset.UtcNow,
MessageId = Guid.NewGuid().ToString("N"),
Role = ChatRole.Assistant,
ResponseId = responseId
ResponseId = responseId,
RawRepresentation = raw
};

this.MessageStore.AddMessages(update.ToChatMessage());
Expand Down Expand Up @@ -153,10 +157,29 @@ IAsyncEnumerable<AgentRunResponseUpdate> InvokeStageAsync(

case RequestInfoEvent requestInfo:
FunctionCallContent fcContent = requestInfo.Request.ToFunctionCall();
AgentRunResponseUpdate update = this.CreateUpdate(this.LastResponseId, fcContent);
AgentRunResponseUpdate update = this.CreateUpdate(this.LastResponseId, evt, fcContent);
yield return update;
break;

case WorkflowErrorEvent workflowError:
Exception? exception = workflowError.Exception;
if (exception is TargetInvocationException tie && tie.InnerException != null)
{
exception = tie.InnerException;
}

if (exception != null)
{
string message = this._includeExceptionDetails
? exception.Message
: "An error occurred while executing the workflow.";

ErrorContent errorContent = new(message);
yield return this.CreateUpdate(this.LastResponseId, evt, errorContent);
}

break;

case SuperStepCompletedEvent stepCompleted:
this.LastCheckpoint = stepCompleted.CompletionInfo?.Checkpoint;
goto default;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright (c) Microsoft. All rights reserved.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Extensions.AI;

namespace Microsoft.Agents.AI.Workflows.UnitTests;

public sealed class ExpectedException : Exception
{
public ExpectedException(string message)
: base(message)
{
}

public ExpectedException() : base()
{
}

public ExpectedException(string? message, Exception? innerException) : base(message, innerException)
{
}
}

public class WorkflowHostSmokeTests
{
private sealed class AlwaysFailsAIAgent(bool failByThrowing) : AIAgent
{
private sealed class Thread : InMemoryAgentThread
{
public Thread() { }

public Thread(JsonElement serializedThread, JsonSerializerOptions? jsonSerializerOptions = null)
: base(serializedThread, jsonSerializerOptions)
{ }
}

public override AgentThread DeserializeThread(JsonElement serializedThread, JsonSerializerOptions? jsonSerializerOptions = null)
{
return new Thread(serializedThread, jsonSerializerOptions);
}

public override AgentThread GetNewThread()
{
return new Thread();
}

protected override async Task<AgentRunResponse> RunCoreAsync(IEnumerable<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
{
return await this.RunStreamingAsync(messages, thread, options, cancellationToken)
.ToAgentRunResponseAsync(cancellationToken);
}

protected override async IAsyncEnumerable<AgentRunResponseUpdate> RunCoreStreamingAsync(IEnumerable<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
const string ErrorMessage = "Simulated agent failure.";
if (failByThrowing)
{
throw new ExpectedException(ErrorMessage);
}

yield return new AgentRunResponseUpdate(ChatRole.Assistant, [new ErrorContent(ErrorMessage)]);
}
}

private static Workflow CreateWorkflow(bool failByThrowing)
{
ExecutorBinding agent = new AlwaysFailsAIAgent(failByThrowing).BindAsExecutor(emitEvents: true);

return new WorkflowBuilder(agent).Build();
}

[Theory]
[InlineData(true, true)]
[InlineData(true, false)]
[InlineData(false, true)]
[InlineData(false, false)]
public async Task Test_AsAgent_ErrorContentStreamedOutAsync(bool includeExceptionDetails, bool failByThrowing)
{
string expectedMessage = !failByThrowing || includeExceptionDetails
? "Simulated agent failure."
: "An error occurred while executing the workflow.";

// Arrange is done by the caller.
Workflow workflow = CreateWorkflow(failByThrowing);

// Act
List<AgentRunResponseUpdate> updates = await workflow.AsAgent("WorkflowAgent", includeExceptionDetails: includeExceptionDetails)
.RunStreamingAsync(new ChatMessage(ChatRole.User, "Hello"))
.ToListAsync();

// Assert
bool hadErrorContent = false;
foreach (AgentRunResponseUpdate update in updates)
{
if (update.Contents.Any())
{
// We should expect a single update which contains the error content.
update.Contents.Should().ContainSingle()
.Which.Should().BeOfType<ErrorContent>()
.Which.Message.Should().Be(expectedMessage);
hadErrorContent = true;
}
}

hadErrorContent.Should().BeTrue();
}
}
Loading