From db0baa94fee706e059c781b562200c0afc1ab405 Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Mon, 13 Oct 2025 13:46:06 -0700 Subject: [PATCH 1/2] AIAgentHostExecutor to use ToAgentRunResponse --- .../Specialized/AIAgentHostExecutor.cs | 40 ++----------------- .../SpecializedExecutorSmokeTests.cs | 38 +++--------------- 2 files changed, 8 insertions(+), 70 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIAgentHostExecutor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIAgentHostExecutor.cs index 836399c5c1..23c951063b 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIAgentHostExecutor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIAgentHostExecutor.cs @@ -54,17 +54,10 @@ protected override async ValueTask TakeTurnAsync(List messages, IWo emitEvents ??= this._emitEvents; IAsyncEnumerable agentStream = this._agent.RunStreamingAsync(messages, this.EnsureThread(context), cancellationToken: cancellationToken); - List updates = []; - ChatMessage? currentStreamingMessage = null; + List updates = []; await foreach (AgentRunResponseUpdate update in agentStream.ConfigureAwait(false)) { - if (string.IsNullOrEmpty(update.MessageId)) - { - // Ignore updates that don't have a message ID. - continue; - } - if (emitEvents ?? this._emitEvents) { await context.AddEventAsync(new AgentRunUpdateEvent(this.Id, update), cancellationToken).ConfigureAwait(false); @@ -74,36 +67,9 @@ protected override async ValueTask TakeTurnAsync(List messages, IWo // In some sense: We should just let it be handled as a ChatMessage, though we should consider // providing some mechanisms to help the user complete the request, or route it out of the // workflow. - - if (currentStreamingMessage is null || currentStreamingMessage.MessageId != update.MessageId) - { - await PublishCurrentMessageAsync().ConfigureAwait(false); - currentStreamingMessage = new(update.Role ?? ChatRole.Assistant, update.Contents) - { - AuthorName = update.AuthorName, - CreatedAt = update.CreatedAt, - MessageId = update.MessageId, - RawRepresentation = update.RawRepresentation, - AdditionalProperties = update.AdditionalProperties - }; - } - - updates.AddRange(update.Contents); + updates.Add(update); } - await PublishCurrentMessageAsync().ConfigureAwait(false); - - async ValueTask PublishCurrentMessageAsync() - { - if (currentStreamingMessage is not null && updates.Count > 0) - { - currentStreamingMessage.Contents = updates; - updates = []; - - await context.SendMessageAsync(currentStreamingMessage, cancellationToken: cancellationToken).ConfigureAwait(false); - } - - currentStreamingMessage = null; - } + await context.SendMessageAsync(updates.ToAgentRunResponse().Messages, cancellationToken: cancellationToken).ConfigureAwait(false); } } diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SpecializedExecutorSmokeTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SpecializedExecutorSmokeTests.cs index 9aea98f068..54e14d5f28 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SpecializedExecutorSmokeTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SpecializedExecutorSmokeTests.cs @@ -113,7 +113,7 @@ public sealed class TestAgentThread() : InMemoryAgentThread(); internal sealed class TestWorkflowContext : IWorkflowContext { - public List> Updates { get; } = []; + public List Updates { get; } = []; public ValueTask AddEventAsync(WorkflowEvent workflowEvent, CancellationToken cancellationToken = default) => default; @@ -140,11 +140,11 @@ public ValueTask SendMessageAsync(object message, string? targetId = null, Cance { if (message is List messages) { - this.Updates.Add(messages); + this.Updates.AddRange(messages); } else if (message is ChatMessage chatMessage) { - this.Updates.Add([chatMessage]); + this.Updates.Add(chatMessage); } return default; @@ -163,15 +163,6 @@ public async Task Test_AIAgentStreamingMessage_AggregationAsync() "Quisque dignissim ante odio, at facilisis orci porta a. Duis mi augue, fringilla eu egestas a, pellentesque sed lacus." ]; - string[][] splits = MessageStrings.Select(t => t.Split()).ToArray(); - foreach (string[] messageSplits in splits) - { - for (int i = 0; i < messageSplits.Length - 1; i++) - { - messageSplits[i] += ' '; - } - } - List expected = TestAIAgent.ToChatMessages(MessageStrings); TestAIAgent agent = new(expected); @@ -187,28 +178,9 @@ public async Task Test_AIAgentStreamingMessage_AggregationAsync() for (int i = 1; i < MessageStrings.Length; i++) { string expectedText = MessageStrings[i]; - string[] expectedSplits = splits[i]; - - ChatMessage equivalent = expected[i]; - List collected = collectingContext.Updates[i - 1]; - - collected.Should().HaveCount(1); - collected[0].Text.Should().Be(expectedText); - collected[0].Contents.Should().HaveCount(splits[i].Length); + ChatMessage collected = collectingContext.Updates[i - 1]; - Action[] splitCheckActions = splits[i].Select(MakeSplitCheckAction).ToArray(); - Assert.Collection(collected[0].Contents, splitCheckActions); - } - - Action MakeSplitCheckAction(string splitString) - { - return Check; - - void Check(AIContent content) - { - TextContent? text = content as TextContent; - text!.Text.Should().Be(splitString); - } + collected.Text.Should().Be(expectedText); } } } From 28fa60d3a7783749c0b29690282abf0d4eabdac8 Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Mon, 20 Oct 2025 10:38:46 -0700 Subject: [PATCH 2/2] Only run agent in stream mode when emit event is true --- .../Specialized/AIAgentHostExecutor.cs | 32 +++++++++++-------- .../SpecializedExecutorSmokeTests.cs | 2 +- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIAgentHostExecutor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIAgentHostExecutor.cs index 23c951063b..cd8386a268 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIAgentHostExecutor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIAgentHostExecutor.cs @@ -51,25 +51,31 @@ protected internal override async ValueTask OnCheckpointRestoredAsync(IWorkflowC protected override async ValueTask TakeTurnAsync(List messages, IWorkflowContext context, bool? emitEvents, CancellationToken cancellationToken = default) { - emitEvents ??= this._emitEvents; - IAsyncEnumerable agentStream = this._agent.RunStreamingAsync(messages, this.EnsureThread(context), cancellationToken: cancellationToken); + if (emitEvents ?? this._emitEvents) + { + // Run the agent in streaming mode only when agent run update events are to be emitted. + IAsyncEnumerable agentStream = this._agent.RunStreamingAsync(messages, this.EnsureThread(context), cancellationToken: cancellationToken); - List updates = []; + List updates = []; - await foreach (AgentRunResponseUpdate update in agentStream.ConfigureAwait(false)) - { - if (emitEvents ?? this._emitEvents) + await foreach (AgentRunResponseUpdate update in agentStream.ConfigureAwait(false)) { await context.AddEventAsync(new AgentRunUpdateEvent(this.Id, update), cancellationToken).ConfigureAwait(false); + + // TODO: FunctionCall request handling, and user info request handling. + // In some sense: We should just let it be handled as a ChatMessage, though we should consider + // providing some mechanisms to help the user complete the request, or route it out of the + // workflow. + updates.Add(update); } - // TODO: FunctionCall request handling, and user info request handling. - // In some sense: We should just let it be handled as a ChatMessage, though we should consider - // providing some mechanisms to help the user complete the request, or route it out of the - // workflow. - updates.Add(update); + await context.SendMessageAsync(updates.ToAgentRunResponse().Messages, cancellationToken: cancellationToken).ConfigureAwait(false); + } + else + { + // Otherwise, run the agent in non-streaming mode. + AgentRunResponse response = await this._agent.RunAsync(messages, this.EnsureThread(context), cancellationToken: cancellationToken).ConfigureAwait(false); + await context.SendMessageAsync(response.Messages, cancellationToken: cancellationToken).ConfigureAwait(false); } - - await context.SendMessageAsync(updates.ToAgentRunResponse().Messages, cancellationToken: cancellationToken).ConfigureAwait(false); } } diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SpecializedExecutorSmokeTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SpecializedExecutorSmokeTests.cs index 54e14d5f28..e34257190e 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SpecializedExecutorSmokeTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SpecializedExecutorSmokeTests.cs @@ -170,7 +170,7 @@ public async Task Test_AIAgentStreamingMessage_AggregationAsync() TestWorkflowContext collectingContext = new(); - await host.TakeTurnAsync(new TurnToken(emitEvents: false), collectingContext); + await host.TakeTurnAsync(new TurnToken(emitEvents: true), collectingContext); // The first empty message is skipped. collectingContext.Updates.Should().HaveCount(MessageStrings.Length - 1);