Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from ._workflow import Workflow

from ._checkpoint_encoding import decode_checkpoint_value, encode_checkpoint_value
from ._const import WORKFLOW_RUN_KWARGS_KEY
from ._events import (
RequestInfoEvent,
WorkflowErrorEvent,
Expand Down Expand Up @@ -366,8 +367,11 @@ async def process_workflow(self, input_data: object, ctx: WorkflowContext[Any])
logger.debug(f"WorkflowExecutor {self.id} starting sub-workflow {self.workflow.id} execution {execution_id}")

try:
# Run the sub-workflow and collect all events
result = await self.workflow.run(input_data)
# Get kwargs from parent workflow's SharedState to propagate to subworkflow
parent_kwargs: dict[str, Any] = await ctx.get_shared_state(WORKFLOW_RUN_KWARGS_KEY) or {}

# Run the sub-workflow and collect all events, passing parent kwargs
result = await self.workflow.run(input_data, **parent_kwargs)

logger.debug(
f"WorkflowExecutor {self.id} sub-workflow {self.workflow.id} "
Expand Down
152 changes: 152 additions & 0 deletions python/packages/core/tests/workflow/test_workflow_kwargs.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,3 +490,155 @@ async def prepare_final_answer(self, context: MagenticContext) -> ChatMessage:


# endregion


# region SubWorkflow (WorkflowExecutor) Tests


async def test_subworkflow_kwargs_propagation() -> None:
"""Test that kwargs are propagated to subworkflows.

Verifies kwargs passed to parent workflow.run_stream() flow through to agents
in subworkflows wrapped by WorkflowExecutor.
"""
from agent_framework._workflows._workflow_executor import WorkflowExecutor

# Create an agent inside the subworkflow that captures kwargs
inner_agent = _KwargsCapturingAgent(name="inner_agent")

# Build the inner (sub) workflow with the agent
inner_workflow = SequentialBuilder().participants([inner_agent]).build()

# Wrap the inner workflow in a WorkflowExecutor so it can be used as a subworkflow
subworkflow_executor = WorkflowExecutor(workflow=inner_workflow, id="subworkflow_executor")

# Build the outer (parent) workflow containing the subworkflow
outer_workflow = SequentialBuilder().participants([subworkflow_executor]).build()

# Define kwargs that should propagate to subworkflow
custom_data = {"api_key": "secret123", "endpoint": "https://api.example.com"}
user_token = {"user_name": "alice", "access_level": "admin"}

# Run the outer workflow with kwargs
async for event in outer_workflow.run_stream(
"test message for subworkflow",
custom_data=custom_data,
user_token=user_token,
):
if isinstance(event, WorkflowStatusEvent) and event.state == WorkflowRunState.IDLE:
break

# Verify that the inner agent was called
assert len(inner_agent.captured_kwargs) >= 1, "Inner agent in subworkflow should have been invoked"

received_kwargs = inner_agent.captured_kwargs[0]

# Verify kwargs were propagated from parent workflow to subworkflow agent
assert "custom_data" in received_kwargs, (
f"Subworkflow agent should receive 'custom_data' kwarg. Received keys: {list(received_kwargs.keys())}"
)
assert "user_token" in received_kwargs, (
f"Subworkflow agent should receive 'user_token' kwarg. Received keys: {list(received_kwargs.keys())}"
)
assert received_kwargs.get("custom_data") == custom_data, (
f"Expected custom_data={custom_data}, got {received_kwargs.get('custom_data')}"
)
assert received_kwargs.get("user_token") == user_token, (
f"Expected user_token={user_token}, got {received_kwargs.get('user_token')}"
)


async def test_subworkflow_kwargs_accessible_via_shared_state() -> None:
"""Test that kwargs are accessible via SharedState within subworkflow.

Verifies that WORKFLOW_RUN_KWARGS_KEY is populated in the subworkflow's SharedState
with kwargs from the parent workflow.
"""
from agent_framework import Executor, WorkflowContext, handler
from agent_framework._workflows._workflow_executor import WorkflowExecutor

captured_kwargs_from_state: list[dict[str, Any]] = []

class _SharedStateReader(Executor):
"""Executor that reads kwargs from SharedState for verification."""

@handler
async def read_kwargs(self, msgs: list[ChatMessage], ctx: WorkflowContext[list[ChatMessage]]) -> None:
kwargs_from_state = await ctx.get_shared_state(WORKFLOW_RUN_KWARGS_KEY)
captured_kwargs_from_state.append(kwargs_from_state or {})
await ctx.send_message(msgs)

# Build inner workflow with SharedState reader
state_reader = _SharedStateReader(id="state_reader")
inner_workflow = SequentialBuilder().participants([state_reader]).build()

# Wrap as subworkflow
subworkflow_executor = WorkflowExecutor(workflow=inner_workflow, id="subworkflow")

# Build outer workflow
outer_workflow = SequentialBuilder().participants([subworkflow_executor]).build()

# Run with kwargs
async for event in outer_workflow.run_stream(
"test",
my_custom_kwarg="should_be_propagated",
another_kwarg=42,
):
if isinstance(event, WorkflowStatusEvent) and event.state == WorkflowRunState.IDLE:
break

# Verify the state reader was invoked
assert len(captured_kwargs_from_state) >= 1, "SharedState reader should have been invoked"

kwargs_in_subworkflow = captured_kwargs_from_state[0]

assert kwargs_in_subworkflow.get("my_custom_kwarg") == "should_be_propagated", (
f"Expected 'my_custom_kwarg' in subworkflow SharedState, got: {kwargs_in_subworkflow}"
)
assert kwargs_in_subworkflow.get("another_kwarg") == 42, (
f"Expected 'another_kwarg'=42 in subworkflow SharedState, got: {kwargs_in_subworkflow}"
)


async def test_nested_subworkflow_kwargs_propagation() -> None:
"""Test kwargs propagation through multiple levels of nested subworkflows.

Verifies kwargs flow through 3 levels:
- Outer workflow
- Middle subworkflow (WorkflowExecutor)
- Inner subworkflow (WorkflowExecutor) with agent
"""
from agent_framework._workflows._workflow_executor import WorkflowExecutor

# Innermost agent
inner_agent = _KwargsCapturingAgent(name="deeply_nested_agent")

# Build inner workflow
inner_workflow = SequentialBuilder().participants([inner_agent]).build()
inner_executor = WorkflowExecutor(workflow=inner_workflow, id="inner_executor")

# Build middle workflow containing inner
middle_workflow = SequentialBuilder().participants([inner_executor]).build()
middle_executor = WorkflowExecutor(workflow=middle_workflow, id="middle_executor")

# Build outer workflow containing middle
outer_workflow = SequentialBuilder().participants([middle_executor]).build()

# Run with kwargs
async for event in outer_workflow.run_stream(
"deeply nested test",
deep_kwarg="should_reach_inner",
):
if isinstance(event, WorkflowStatusEvent) and event.state == WorkflowRunState.IDLE:
break

# Verify inner agent was called
assert len(inner_agent.captured_kwargs) >= 1, "Deeply nested agent should be invoked"

received = inner_agent.captured_kwargs[0]
assert received.get("deep_kwarg") == "should_reach_inner", (
f"Deeply nested agent should receive 'deep_kwarg'. Got: {received}"
)


# endregion
1 change: 1 addition & 0 deletions python/samples/getting_started/workflows/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ Once comfortable with these, explore the rest of the samples below.
| Sub-Workflow (Basics) | [composition/sub_workflow_basics.py](./composition/sub_workflow_basics.py) | Wrap a workflow as an executor and orchestrate sub-workflows |
| Sub-Workflow: Request Interception | [composition/sub_workflow_request_interception.py](./composition/sub_workflow_request_interception.py) | Intercept and forward sub-workflow requests using @handler for SubWorkflowRequestMessage |
| Sub-Workflow: Parallel Requests | [composition/sub_workflow_parallel_requests.py](./composition/sub_workflow_parallel_requests.py) | Multiple specialized interceptors handling different request types from same sub-workflow |
| Sub-Workflow: kwargs Propagation | [composition/sub_workflow_kwargs.py](./composition/sub_workflow_kwargs.py) | Pass custom context (user tokens, config) from parent workflow through to sub-workflow agents |

### control-flow

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
# Copyright (c) Microsoft. All rights reserved.

import asyncio
import json
from typing import Annotated, Any

from agent_framework import (
ChatMessage,
SequentialBuilder,
WorkflowExecutor,
WorkflowOutputEvent,
ai_function,
)
from agent_framework.openai import OpenAIChatClient

"""
Sample: Sub-Workflow kwargs Propagation

This sample demonstrates how custom context (kwargs) flows from a parent workflow
through to agents in sub-workflows. When you pass kwargs to the parent workflow's
run_stream() or run(), they automatically propagate to nested sub-workflows.

Key Concepts:
- kwargs passed to parent workflow.run_stream() propagate to sub-workflows
- Sub-workflow agents receive the same kwargs as the parent workflow
- Works with nested WorkflowExecutor compositions at any depth
- Useful for passing authentication tokens, configuration, or request context

Prerequisites:
- OpenAI environment variables configured
"""


# Define tools that access custom context via **kwargs
@ai_function
def get_authenticated_data(
resource: Annotated[str, "The resource to fetch"],
**kwargs: Any,
) -> str:
"""Fetch data using the authenticated user context from kwargs."""
user_token = kwargs.get("user_token", {})
user_name = user_token.get("user_name", "anonymous")
access_level = user_token.get("access_level", "none")

print(f"\n[get_authenticated_data] kwargs keys: {list(kwargs.keys())}")
print(f"[get_authenticated_data] User: {user_name}, Access: {access_level}")

return f"Fetched '{resource}' for user {user_name} ({access_level} access)"


@ai_function
def call_configured_service(
service_name: Annotated[str, "Name of the service to call"],
**kwargs: Any,
) -> str:
"""Call a service using configuration from kwargs."""
config = kwargs.get("service_config", {})
services = config.get("services", {})

print(f"\n[call_configured_service] kwargs keys: {list(kwargs.keys())}")
print(f"[call_configured_service] Available services: {list(services.keys())}")

if service_name in services:
endpoint = services[service_name]
return f"Called service '{service_name}' at {endpoint}"
return f"Service '{service_name}' not found in configuration"


async def main() -> None:
print("=" * 70)
print("Sub-Workflow kwargs Propagation Demo")
print("=" * 70)

# Create chat client
chat_client = OpenAIChatClient()

# Create an agent with tools that use kwargs
inner_agent = chat_client.create_agent(
name="data_agent",
instructions=(
"You are a data access agent. Use the available tools to help users. "
"When asked to fetch data, use get_authenticated_data. "
"When asked to call a service, use call_configured_service."
),
tools=[get_authenticated_data, call_configured_service],
)

# Build the inner (sub) workflow with the agent
inner_workflow = SequentialBuilder().participants([inner_agent]).build()

# Wrap the inner workflow in a WorkflowExecutor to use it as a sub-workflow
subworkflow_executor = WorkflowExecutor(
workflow=inner_workflow,
id="data_subworkflow",
)

# Build the outer (parent) workflow containing the sub-workflow
outer_workflow = SequentialBuilder().participants([subworkflow_executor]).build()

# Define custom context that will flow through to the sub-workflow's agent
user_token = {
"user_name": "alice@contoso.com",
"access_level": "admin",
"session_id": "sess_12345",
}

service_config = {
"services": {
"users": "https://api.example.com/v1/users",
"orders": "https://api.example.com/v1/orders",
"inventory": "https://api.example.com/v1/inventory",
},
"timeout": 30,
}

print("\nContext being passed to parent workflow:")
print(f" user_token: {json.dumps(user_token, indent=4)}")
print(f" service_config: {json.dumps(service_config, indent=4)}")
print("\n" + "-" * 70)
print("Workflow Execution (kwargs flow: parent -> sub-workflow -> agent -> tool):")
print("-" * 70)

# Run the OUTER workflow with kwargs
# These kwargs will automatically propagate to the inner sub-workflow
async for event in outer_workflow.run_stream(
"Please fetch my profile data and then call the users service.",
user_token=user_token,
service_config=service_config,
):
if isinstance(event, WorkflowOutputEvent):
output_data = event.data
if isinstance(output_data, list):
for item in output_data: # type: ignore
if isinstance(item, ChatMessage) and item.text:
print(f"\n[Final Answer]: {item.text}")

print("\n" + "=" * 70)
print("Sample Complete - kwargs successfully flowed through sub-workflow!")
print("=" * 70)


if __name__ == "__main__":
asyncio.run(main())
Loading
Loading