-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Closed
Labels
.NETv1.0Features being tracked for the version 1.0 GAFeatures being tracked for the version 1.0 GAworkflowsRelated to Workflows in agent-frameworkRelated to Workflows in agent-framework
Description
Scenario
- Set a Shared State object from any executor in the main workflow.
- Create a subworkflow, then try to read the Shared State object from any executor inside that workflow.
- Reading the Shared State object returns null.
Example
class Program
{
public static async Task Main(string[] args)
{
var text = " Lorem ipsum dolor sit amet, consectetur adipiscing elit. ";
// Runs successfully
await OneWorkflow(text, new TextReadExecutor(), new TextTrimExecutor(), new CharCountingExecutor());
// Fails since shared state is not preserved across sub-workflow boundary
await WithSubWorkflow(text, new TextReadExecutor(), new TextTrimExecutor(), new CharCountingExecutor());
}
private static async Task OneWorkflow(string text, TextReadExecutor textRead, TextTrimExecutor textTrim, CharCountingExecutor charCount)
{
Console.WriteLine("Running single workflow...");
var workflow = new WorkflowBuilder(textRead)
.AddEdge(textRead, textTrim)
.AddEdge(textTrim, charCount)
.WithOutputFrom(charCount)
.Build();
await using Run run = await InProcessExecution.RunAsync(workflow, text);
foreach (WorkflowEvent evt in run.NewEvents)
{
if (evt is WorkflowOutputEvent outputEvent)
{
Console.WriteLine(outputEvent.Data);
}
}
}
private static async Task WithSubWorkflow(string text, TextReadExecutor textRead, TextTrimExecutor textTrim, CharCountingExecutor charCount)
{
Console.WriteLine("Running workflow with sub-workflow...");
var subWorkflow = new WorkflowBuilder(textTrim)
.Build();
var subWorkflowStep = subWorkflow.BindAsExecutor("textTrim");
var workflow = new WorkflowBuilder(textRead)
.AddEdge(textRead, subWorkflowStep)
.AddEdge(subWorkflowStep, charCount)
.WithOutputFrom(charCount)
.Build();
await using Run run = await InProcessExecution.RunAsync(workflow, text);
foreach (WorkflowEvent evt in run.NewEvents)
{
if (evt is WorkflowOutputEvent outputEvent)
{
Console.WriteLine(outputEvent.Data);
}
else if (evt is WorkflowErrorEvent failedEvent)
{
Console.WriteLine($"Workflow failed: {failedEvent.Data}");
}
}
}
}
internal static class Constants
{
public const string WordStateScope = "WordStateScope";
}
internal sealed class TextReadExecutor() : Executor<string, string>("TextReadExecutor")
{
public override async ValueTask<string> HandleAsync(string word, IWorkflowContext context, CancellationToken cancellationToken = default)
{
string key = Guid.NewGuid().ToString();
await context.QueueStateUpdateAsync(key, word, scopeName: Constants.WordStateScope, cancellationToken);
return key;
}
}
internal sealed class TextTrimExecutor() : Executor<string, string>("TextTrimExecutor")
{
public override async ValueTask<string> HandleAsync(string key, IWorkflowContext context, CancellationToken cancellationToken = default)
{
var content = await context.ReadStateAsync<string>(key, scopeName: Constants.WordStateScope, cancellationToken)
?? throw new InvalidOperationException("Word state not found");
var trimmed = content.Trim();
await context.QueueStateUpdateAsync(key, trimmed, scopeName: Constants.WordStateScope, cancellationToken);
return key;
}
}
internal sealed class CharCountingExecutor() : Executor<string, int>("CharCountingExecutor")
{
public override async ValueTask<int> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
var content = await context.ReadStateAsync<string>(message, scopeName: Constants.WordStateScope, cancellationToken);
int charCount = content?.Length ?? 0;
return charCount;
}
}
- Version: 1.0.0-preview.251114.1
Metadata
Metadata
Assignees
Labels
.NETv1.0Features being tracked for the version 1.0 GAFeatures being tracked for the version 1.0 GAworkflowsRelated to Workflows in agent-frameworkRelated to Workflows in agent-framework
Type
Projects
Status
Done