From 95df15db31f2ddcc5430fac7376d836dbd2de352 Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Tue, 9 Dec 2025 16:41:07 -0800 Subject: [PATCH 01/12] Add factory pattern to concurrent orchestration builder --- .../agent_framework/_workflows/_concurrent.py | 165 ++++++++++++++--- .../core/tests/workflow/test_concurrent.py | 172 +++++++++++++++++- .../concurrent_custom_aggregator.py | 2 +- .../concurrent_participant_factories.py | 172 ++++++++++++++++++ 4 files changed, 487 insertions(+), 24 deletions(-) create mode 100644 python/samples/getting_started/workflows/orchestration/concurrent_participant_factories.py diff --git a/python/packages/core/agent_framework/_workflows/_concurrent.py b/python/packages/core/agent_framework/_workflows/_concurrent.py index 6b3e1ac05e..437c33ecd7 100644 --- a/python/packages/core/agent_framework/_workflows/_concurrent.py +++ b/python/packages/core/agent_framework/_workflows/_concurrent.py @@ -188,8 +188,10 @@ class ConcurrentBuilder: r"""High-level builder for concurrent agent workflows. - `participants([...])` accepts a list of AgentProtocol (recommended) or Executor. + - `register_participants([...])` accepts a list of factories for AgentProtocol (recommended) + or Executor factories - `build()` wires: dispatcher -> fan-out -> participants -> fan-in -> aggregator. - - `with_custom_aggregator(...)` overrides the default aggregator with an Executor or callback. + - `with_aggregator(...)` overrides the default aggregator with an Executor or callback. Usage: @@ -200,14 +202,17 @@ class ConcurrentBuilder: # Minimal: use default aggregator (returns list[ChatMessage]) workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).build() + # With agent factories + workflow = ConcurrentBuilder().register_participants([create_agent1, create_agent2, create_agent3]).build() + # Custom aggregator via callback (sync or async). The callback receives # list[AgentExecutorResponse] and its return value becomes the workflow's output. - def summarize(results): + def summarize(results: list[AgentExecutorResponse]) -> str: return " | ".join(r.agent_run_response.messages[-1].text for r in results) - workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_custom_aggregator(summarize).build() + workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_aggregator(summarize).build() # Enable checkpoint persistence so runs can resume @@ -216,9 +221,65 @@ def summarize(results): def __init__(self) -> None: self._participants: list[AgentProtocol | Executor] = [] - self._aggregator: Executor | None = None + self._participant_factories: list[Callable[[], AgentProtocol | Executor]] = [] + self._aggregator: ( + Executor + | Callable[[], Executor] + | Callable[[list[AgentExecutorResponse]], Any] + | Callable[[list[AgentExecutorResponse], WorkflowContext[Never, Any]], Any] + | None + ) = None self._checkpoint_storage: CheckpointStorage | None = None + def register_participants( + self, + participant_factories: Sequence[Callable[[], AgentProtocol | Executor]], + ) -> "ConcurrentBuilder": + r"""Define the parallel participants for this concurrent workflow. + + Accepts AgentProtocol instances (e.g., created by a chat client) or Executor + factories. Each participant created by a factory is wired as a parallel branch + using fan-out edges from an internal dispatcher. + + Raises: + ValueError: if `participant_factories` is empty, contains duplicates, or `.participants()` was called + TypeError: if any entry is not AgentProtocol or Executor + + Example: + + .. code-block:: python + + def create_researcher() -> ChatAgent: + return ... + + + def create_marketer() -> ChatAgent: + return ... + + + def create_legal() -> ChatAgent: + return ... + + + class MyCustomExecutor(Executor): ... + + + wf = ConcurrentBuilder().register_participants([create_researcher, create_marketer, create_legal]).build() + + # Mixing agent(s) and executor(s) is supported + wf2 = ConcurrentBuilder().register_participants([create_researcher, my_custom_executor]).build() + """ + if self._participants: + raise ValueError( + "Cannot mix .participants([...]) and .register_participants() in the same builder instance." + ) + + if not participant_factories: + raise ValueError("participant_factories cannot be empty") + + self._participant_factories = list(participant_factories) + return self + def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "ConcurrentBuilder": r"""Define the parallel participants for this concurrent workflow. @@ -227,7 +288,7 @@ def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "Con from an internal dispatcher. Raises: - ValueError: if `participants` is empty or contains duplicates + ValueError: if `participants` is empty, contains duplicates, or `.register_participants()` was called TypeError: if any entry is not AgentProtocol or Executor Example: @@ -239,6 +300,11 @@ def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "Con # Mixing agent(s) and executor(s) is supported wf2 = ConcurrentBuilder().participants([researcher_agent, my_custom_executor]).build() """ + if self._participant_factories: + raise ValueError( + "Cannot mix .participants([...]) and .register_participants() in the same builder instance." + ) + if not participants: raise ValueError("participants cannot be empty") @@ -261,12 +327,18 @@ def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "Con self._participants = list(participants) return self - def with_aggregator(self, aggregator: Executor | Callable[..., Any]) -> "ConcurrentBuilder": - r"""Override the default aggregator with an Executor or a callback. - - - Executor: must handle `list[AgentExecutorResponse]` and - yield output using `ctx.yield_output(...)` and add a - output and the workflow becomes idle. + def with_aggregator( + self, + aggregator: Executor + | Callable[[], Executor] + | Callable[[list[AgentExecutorResponse]], Any] + | Callable[[list[AgentExecutorResponse], WorkflowContext[Never, Any]], Any], + ) -> "ConcurrentBuilder": + r"""Override the default aggregator with an executor, an executor factory, or a callback. + + - Executor: must handle `list[AgentExecutorResponse]` and yield output using `ctx.yield_output(...)` + - Executor factory: callable returning an Executor instance that handles `list[AgentExecutorResponse]` + and yields output using `ctx.yield_output(...)` - Callback: sync or async callable with one of the signatures: `(results: list[AgentExecutorResponse]) -> Any | None` or `(results: list[AgentExecutorResponse], ctx: WorkflowContext) -> Any | None`. @@ -275,20 +347,44 @@ def with_aggregator(self, aggregator: Executor | Callable[..., Any]) -> "Concurr Example: .. code-block:: python + # Executor-based aggregator + class CustomAggregator(Executor): + @handler + async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext) -> None: + await ctx.yield_output(" | ".join(r.agent_run_response.messages[-1].text for r in results)) + + + wf = ConcurrentBuilder().participants([a1, a2, a3]).with_aggregator(CustomAggregator()).build() + + # Factory-based aggregator + wf = ( + ConcurrentBuilder() + .participants([a1, a2, a3]) + .with_aggregator(lambda: CustomAggregator(id="custom_aggregator")) + .build() + ) + # Callback-based aggregator (string result) - async def summarize(results): + async def summarize(results: list[AgentExecutorResponse]) -> str: return " | ".join(r.agent_run_response.messages[-1].text for r in results) - wf = ConcurrentBuilder().participants([a1, a2, a3]).with_custom_aggregator(summarize).build() + wf = ConcurrentBuilder().participants([a1, a2, a3]).with_aggregator(summarize).build() + + + # Callback-based aggregator (yield result) + async def summarize(results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, str]) -> None: + await ctx.yield_output(" | ".join(r.agent_run_response.messages[-1].text for r in results)) + + + wf = ConcurrentBuilder().participants([a1, a2, a3]).with_aggregator(summarize).build() """ - if isinstance(aggregator, Executor): + if isinstance(aggregator, Executor) or callable(aggregator): self._aggregator = aggregator - elif callable(aggregator): - self._aggregator = _CallbackAggregator(aggregator) else: raise TypeError("aggregator must be an Executor or a callable") + return self def with_checkpointing(self, checkpoint_storage: CheckpointStorage) -> "ConcurrentBuilder": @@ -304,7 +400,7 @@ def build(self) -> Workflow: - Fan-in aggregator collects `AgentExecutorResponse` objects - Aggregator yields output and the workflow becomes idle. The output is either: - list[ChatMessage] (default aggregator: one user + one assistant per agent) - - custom payload from the provided callback/executor + - custom payload from the provided aggregator Returns: Workflow: a ready-to-run workflow instance @@ -318,16 +414,41 @@ def build(self) -> Workflow: workflow = ConcurrentBuilder().participants([agent1, agent2]).build() """ - if not self._participants: - raise ValueError("No participants provided. Call .participants([...]) first.") + if not self._participants and not self._participant_factories: + raise ValueError( + "No participants provided. Call .participants([...]) or .register_participants([...]) first." + ) + # Internal nodes dispatcher = _DispatchToAllParticipants(id="dispatcher") - aggregator = self._aggregator or _AggregateAgentConversations(id="aggregator") + if isinstance(self._aggregator, Executor): + # Case 1: Executor instance - use directly + aggregator = self._aggregator + elif callable(self._aggregator): + # Distinguish between an aggregator factory and callback-based aggregator + # by checking the signature: factory has 0 params, callback has 1-2 params + sig = inspect.signature(self._aggregator) + param_count = len(sig.parameters) + + # Case 2: Executor factory (no parameters) - call it to create the executor + # Case 3: Callback with parameters (1-2 params) - wrap in _CallbackAggregator + aggregator = self._aggregator() if param_count == 0 else _CallbackAggregator(self._aggregator) # type: ignore + else: + # Case 4: No custom aggregator provided - use the default one + aggregator = _AggregateAgentConversations(id="aggregator") + + participants: list[Executor | AgentProtocol] = [] + if self._participant_factories: + for factory in self._participant_factories: + p = factory() + participants.append(p) + else: + participants = self._participants builder = WorkflowBuilder() builder.set_start_executor(dispatcher) - builder.add_fan_out_edges(dispatcher, list(self._participants)) - builder.add_fan_in_edges(list(self._participants), aggregator) + builder.add_fan_out_edges(dispatcher, list(participants)) + builder.add_fan_in_edges(list(participants), aggregator) if self._checkpoint_storage is not None: builder = builder.with_checkpointing(self._checkpoint_storage) diff --git a/python/packages/core/tests/workflow/test_concurrent.py b/python/packages/core/tests/workflow/test_concurrent.py index db70be3f38..e30cd3162c 100644 --- a/python/packages/core/tests/workflow/test_concurrent.py +++ b/python/packages/core/tests/workflow/test_concurrent.py @@ -1,6 +1,6 @@ # Copyright (c) Microsoft. All rights reserved. -from typing import Any, cast +from typing import Any, Never, cast import pytest @@ -159,6 +159,114 @@ def summarize(results: list[AgentExecutorResponse]) -> str: # type: ignore[over assert aggregator.id == "summarize" +async def test_concurrent_with_aggregator_executor_instance() -> None: + """Test with_aggregator using an Executor instance (not factory).""" + + class CustomAggregator(Executor): + @handler + async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, str]) -> None: + texts: list[str] = [] + for r in results: + msgs: list[ChatMessage] = r.agent_run_response.messages + texts.append(msgs[-1].text if msgs else "") + await ctx.yield_output(" & ".join(sorted(texts))) + + e1 = _FakeAgentExec("agentA", "One") + e2 = _FakeAgentExec("agentB", "Two") + + aggregator_instance = CustomAggregator(id="instance_aggregator") + wf = ConcurrentBuilder().participants([e1, e2]).with_aggregator(aggregator_instance).build() + + completed = False + output: str | None = None + async for ev in wf.run_stream("prompt: instance test"): + if isinstance(ev, WorkflowStatusEvent) and ev.state == WorkflowRunState.IDLE: + completed = True + elif isinstance(ev, WorkflowOutputEvent): + output = cast(str, ev.data) + if completed and output is not None: + break + + assert completed + assert output is not None + assert isinstance(output, str) + assert output == "One & Two" + + +async def test_concurrent_with_aggregator_executor_factory() -> None: + """Test with_aggregator using an Executor factory.""" + + class CustomAggregator(Executor): + @handler + async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, str]) -> None: + texts: list[str] = [] + for r in results: + msgs: list[ChatMessage] = r.agent_run_response.messages + texts.append(msgs[-1].text if msgs else "") + await ctx.yield_output(" | ".join(sorted(texts))) + + e1 = _FakeAgentExec("agentA", "One") + e2 = _FakeAgentExec("agentB", "Two") + + wf = ( + ConcurrentBuilder() + .participants([e1, e2]) + .with_aggregator(lambda: CustomAggregator(id="custom_aggregator")) + .build() + ) + + completed = False + output: str | None = None + async for ev in wf.run_stream("prompt: factory test"): + if isinstance(ev, WorkflowStatusEvent) and ev.state == WorkflowRunState.IDLE: + completed = True + elif isinstance(ev, WorkflowOutputEvent): + output = cast(str, ev.data) + if completed and output is not None: + break + + assert completed + assert output is not None + assert isinstance(output, str) + assert output == "One | Two" + + +async def test_concurrent_with_aggregator_executor_factory_with_default_id() -> None: + """Test with_aggregator using an Executor factory.""" + + class CustomAggregator(Executor): + def __init__(self, id: str = "default_aggregator") -> None: + super().__init__(id) + + @handler + async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, str]) -> None: + texts: list[str] = [] + for r in results: + msgs: list[ChatMessage] = r.agent_run_response.messages + texts.append(msgs[-1].text if msgs else "") + await ctx.yield_output(" | ".join(sorted(texts))) + + e1 = _FakeAgentExec("agentA", "One") + e2 = _FakeAgentExec("agentB", "Two") + + wf = ConcurrentBuilder().participants([e1, e2]).with_aggregator(CustomAggregator).build() + + completed = False + output: str | None = None + async for ev in wf.run_stream("prompt: factory test"): + if isinstance(ev, WorkflowStatusEvent) and ev.state == WorkflowRunState.IDLE: + completed = True + elif isinstance(ev, WorkflowOutputEvent): + output = cast(str, ev.data) + if completed and output is not None: + break + + assert completed + assert output is not None + assert isinstance(output, str) + assert output == "One | Two" + + async def test_concurrent_checkpoint_resume_round_trip() -> None: storage = InMemoryCheckpointStorage() @@ -278,3 +386,65 @@ async def test_concurrent_checkpoint_runtime_overrides_buildtime() -> None: assert len(runtime_checkpoints) > 0, "Runtime storage should have checkpoints" assert len(buildtime_checkpoints) == 0, "Build-time storage should have no checkpoints when overridden" + + +def test_concurrent_builder_rejects_empty_participant_factories() -> None: + with pytest.raises(ValueError): + ConcurrentBuilder().register_participants([]) + + +def test_concurrent_builder_rejects_mixing_participants_and_factories() -> None: + """Test that mixing .participants() and .register_participants() raises an error.""" + # Case 1: participants first, then register_participants + with pytest.raises(ValueError, match="Cannot mix .participants"): + ( + ConcurrentBuilder() + .participants([_FakeAgentExec("a", "A")]) + .register_participants([lambda: _FakeAgentExec("b", "B")]) + ) + + # Case 2: register_participants first, then participants + with pytest.raises(ValueError, match="Cannot mix .participants"): + ( + ConcurrentBuilder() + .register_participants([lambda: _FakeAgentExec("a", "A")]) + .participants([_FakeAgentExec("b", "B")]) + ) + + +async def test_concurrent_with_register_participants() -> None: + """Test workflow creation using register_participants with factories.""" + + def create_agent1() -> Executor: + return _FakeAgentExec("agentA", "Alpha") + + def create_agent2() -> Executor: + return _FakeAgentExec("agentB", "Beta") + + def create_agent3() -> Executor: + return _FakeAgentExec("agentC", "Gamma") + + wf = ConcurrentBuilder().register_participants([create_agent1, create_agent2, create_agent3]).build() + + completed = False + output: list[ChatMessage] | None = None + async for ev in wf.run_stream("test prompt"): + if isinstance(ev, WorkflowStatusEvent) and ev.state == WorkflowRunState.IDLE: + completed = True + elif isinstance(ev, WorkflowOutputEvent): + output = cast(list[ChatMessage], ev.data) + if completed and output is not None: + break + + assert completed + assert output is not None + messages: list[ChatMessage] = output + + # Expect one user message + one assistant message per participant + assert len(messages) == 1 + 3 + assert messages[0].role == Role.USER + assert "test prompt" in messages[0].text + + assistant_texts = {m.text for m in messages[1:]} + assert assistant_texts == {"Alpha", "Beta", "Gamma"} + assert all(m.role == Role.ASSISTANT for m in messages[1:]) diff --git a/python/samples/getting_started/workflows/orchestration/concurrent_custom_aggregator.py b/python/samples/getting_started/workflows/orchestration/concurrent_custom_aggregator.py index 4ad8c9fcb3..44f71ba7bc 100644 --- a/python/samples/getting_started/workflows/orchestration/concurrent_custom_aggregator.py +++ b/python/samples/getting_started/workflows/orchestration/concurrent_custom_aggregator.py @@ -17,7 +17,7 @@ The workflow completes when all participants become idle. Demonstrates: -- ConcurrentBuilder().participants([...]).with_custom_aggregator(callback) +- ConcurrentBuilder().participants([...]).with_aggregator(callback) - Fan-out to agents and fan-in at an aggregator - Aggregation implemented via an LLM call (chat_client.get_response) - Workflow output yielded with the synthesized summary string diff --git a/python/samples/getting_started/workflows/orchestration/concurrent_participant_factories.py b/python/samples/getting_started/workflows/orchestration/concurrent_participant_factories.py new file mode 100644 index 0000000000..ede448ce0b --- /dev/null +++ b/python/samples/getting_started/workflows/orchestration/concurrent_participant_factories.py @@ -0,0 +1,172 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +from typing import Any, Never + +from agent_framework import ( + ChatAgent, + ChatMessage, + ConcurrentBuilder, + Executor, + Role, + Workflow, + WorkflowContext, + handler, +) +from agent_framework.azure import AzureOpenAIChatClient +from azure.identity import AzureCliCredential + +""" +Sample: Concurrent Orchestration with participant factories and Custom Aggregator + +Build a concurrent workflow with ConcurrentBuilder that fans out one prompt to +multiple domain agents and fans in their responses. + +Override the default aggregator with a custom Executor class that uses +AzureOpenAIChatClient.get_response() to synthesize a concise, consolidated summary +from the experts' outputs. + +All participants and the aggregator are created via factory functions that return +their respective ChatAgent or Executor instances. + +Using participant factories allows you to set up proper state isolation between workflow +instances created by the same builder. This is particularly useful when you need to handle +requests or tasks in parallel with stateful participants. + +Demonstrates: +- ConcurrentBuilder().register_participants([...]).with_aggregator(callback) +- Fan-out to agents and fan-in at an aggregator +- Aggregation implemented via an LLM call (chat_client.get_response) +- Workflow output yielded with the synthesized summary string + +Prerequisites: +- Azure OpenAI configured for AzureOpenAIChatClient (az login + required env vars) +""" + + +def create_researcher() -> ChatAgent: + """Factory function to create a researcher agent instance.""" + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( + instructions=( + "You're an expert market and product researcher. Given a prompt, provide concise, factual insights," + " opportunities, and risks." + ), + name="researcher", + ) + + +def create_marketer() -> ChatAgent: + """Factory function to create a marketer agent instance.""" + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( + instructions=( + "You're a creative marketing strategist. Craft compelling value propositions and target messaging" + " aligned to the prompt." + ), + name="marketer", + ) + + +def create_legal() -> ChatAgent: + """Factory function to create a legal/compliance agent instance.""" + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( + instructions=( + "You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns" + " based on the prompt." + ), + name="legal", + ) + + +class SummarizationExecutor(Executor): + """Custom aggregator executor that synthesizes expert outputs into a concise summary.""" + + def __init__(self): + super().__init__(id="summarization_executor") + self.chat_client = AzureOpenAIChatClient(credential=AzureCliCredential()) + + @handler + async def summarize_results(self, results: list[Any], ctx: WorkflowContext[Never, str]) -> None: + expert_sections: list[str] = [] + for r in results: + try: + messages = getattr(r.agent_run_response, "messages", []) + final_text = messages[-1].text if messages and hasattr(messages[-1], "text") else "(no content)" + expert_sections.append(f"{getattr(r, 'executor_id', 'expert')}:\n{final_text}") + except Exception as e: + expert_sections.append(f"{getattr(r, 'executor_id', 'expert')}: (error: {type(e).__name__}: {e})") + + # Ask the model to synthesize a concise summary of the experts' outputs + system_msg = ChatMessage( + Role.SYSTEM, + text=( + "You are a helpful assistant that consolidates multiple domain expert outputs " + "into one cohesive, concise summary with clear takeaways. Keep it under 200 words." + ), + ) + user_msg = ChatMessage(Role.USER, text="\n\n".join(expert_sections)) + + response = await self.chat_client.get_response([system_msg, user_msg]) + + await ctx.yield_output(response.messages[-1].text if response.messages else "") + + +async def run_workflow(workflow: Workflow, query: str) -> None: + events = await workflow.run(query) + outputs = events.get_outputs() + + if outputs: + print(outputs[0]) # Get the first (and typically only) output + else: + raise RuntimeError("No outputs received from the workflow.") + + +async def main() -> None: + # Create a concurrent builder with participant factories and a custom aggregator + # - register_participants([...]) accepts factory functions that return + # AgentProtocol (agents) or Executor instances. + # - with_aggregator(...) overrides the default aggregator: + # • Default aggregator -> returns list[ChatMessage] (one user + one assistant per agent) + # • Custom callback -> return value becomes workflow output (string here) + # • Custom Executor -> can yield outputs via ctx.yield_output(...) + concurrent_builder = ( + ConcurrentBuilder() + .register_participants([create_researcher, create_marketer, create_legal]) + .with_aggregator(SummarizationExecutor) + ) + + # Build workflow_a + workflow_a = concurrent_builder.build() + + # Run workflow_a + # Context is maintained across runs + print("=== First Run on workflow_a ===") + await run_workflow(workflow_a, "We are launching a new budget-friendly electric bike for urban commuters.") + print("\n=== Second Run on workflow_a ===") + await run_workflow(workflow_a, "Refine your response to focus on the California market.") + + # Build workflow_b + # This will create new instances of all participants and the aggregator + # The agents will also get new threads + workflow_b = concurrent_builder.build() + # Run workflow_b + # Context is not maintained across instances + # Should not expect mentions of electric bikes in the results + print("\n=== First Run on workflow_b ===") + await run_workflow(workflow_b, "Refine your response to focus on the California market.") + + """ + Sample Output: + + === First Run on workflow_a === + The budget-friendly electric bike market is poised for significant growth, driven by urbanization, ... + + === Second Run on workflow_a === + Launching a budget-friendly electric bike in California presents significant opportunities, driven ... + + === First Run on workflow_b === + To successfully penetrate the California market, consider these tailored strategies focused on ... + """ + + +if __name__ == "__main__": + asyncio.run(main()) From 065798c7c3c15cde6996bda90d9d51e0ca463939 Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Wed, 10 Dec 2025 09:09:38 -0800 Subject: [PATCH 02/12] Update readme --- python/samples/getting_started/workflows/README.md | 1 + ...articipant_factories.py => concurrent_participant_factory.py} | 0 2 files changed, 1 insertion(+) rename python/samples/getting_started/workflows/orchestration/{concurrent_participant_factories.py => concurrent_participant_factory.py} (100%) diff --git a/python/samples/getting_started/workflows/README.md b/python/samples/getting_started/workflows/README.md index f22d993669..0e68969b91 100644 --- a/python/samples/getting_started/workflows/README.md +++ b/python/samples/getting_started/workflows/README.md @@ -97,6 +97,7 @@ For additional observability samples in Agent Framework, see the [observability | Concurrent Orchestration (Default Aggregator) | [orchestration/concurrent_agents.py](./orchestration/concurrent_agents.py) | Fan-out to multiple agents; fan-in with default aggregator returning combined ChatMessages | | Concurrent Orchestration (Custom Aggregator) | [orchestration/concurrent_custom_aggregator.py](./orchestration/concurrent_custom_aggregator.py) | Override aggregator via callback; summarize results with an LLM | | Concurrent Orchestration (Custom Agent Executors) | [orchestration/concurrent_custom_agent_executors.py](./orchestration/concurrent_custom_agent_executors.py) | Child executors own ChatAgents; concurrent fan-out/fan-in via ConcurrentBuilder | +| Concurrent Orchestration (Participant Factory) | [orchestration/concurrent_participant_factory.py](./orchestration/concurrent_participant_factory.py) | Use participant factories for state isolation between workflow instances | | Group Chat with Agent Manager | [orchestration/group_chat_agent_manager.py](./orchestration/group_chat_agent_manager.py) | Agent-based manager using `set_manager()` to select next speaker | | Group Chat Philosophical Debate | [orchestration/group_chat_philosophical_debate.py](./orchestration/group_chat_philosophical_debate.py) | Agent manager moderates long-form, multi-round debate across diverse participants | | Group Chat with Simple Function Selector | [orchestration/group_chat_simple_selector.py](./orchestration/group_chat_simple_selector.py) | Group chat with a simple function selector for next speaker | diff --git a/python/samples/getting_started/workflows/orchestration/concurrent_participant_factories.py b/python/samples/getting_started/workflows/orchestration/concurrent_participant_factory.py similarity index 100% rename from python/samples/getting_started/workflows/orchestration/concurrent_participant_factories.py rename to python/samples/getting_started/workflows/orchestration/concurrent_participant_factory.py From 7cb49b3e222cfaa8af2dccd2d86d305f7b5dad8a Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Wed, 10 Dec 2025 10:26:25 -0800 Subject: [PATCH 03/12] Address AI comments --- .../agent_framework/_workflows/_concurrent.py | 15 +++++++++------ .../core/tests/workflow/test_concurrent.py | 16 +++++++++++++++- .../concurrent_participant_factory.py | 2 +- 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_concurrent.py b/python/packages/core/agent_framework/_workflows/_concurrent.py index 437c33ecd7..76c325130f 100644 --- a/python/packages/core/agent_framework/_workflows/_concurrent.py +++ b/python/packages/core/agent_framework/_workflows/_concurrent.py @@ -237,13 +237,12 @@ def register_participants( ) -> "ConcurrentBuilder": r"""Define the parallel participants for this concurrent workflow. - Accepts AgentProtocol instances (e.g., created by a chat client) or Executor - factories. Each participant created by a factory is wired as a parallel branch - using fan-out edges from an internal dispatcher. + Accepts factories (callables) that return AgentProtocol instances (e.g., created + by a chat client) or Executor instances. Each participant created by a factory + is wired as a parallel branch using fan-out edges from an internal dispatcher. Raises: - ValueError: if `participant_factories` is empty, contains duplicates, or `.participants()` was called - TypeError: if any entry is not AgentProtocol or Executor + ValueError: if `participant_factories` is empty or `.participants()` was called Example: @@ -267,7 +266,7 @@ class MyCustomExecutor(Executor): ... wf = ConcurrentBuilder().register_participants([create_researcher, create_marketer, create_legal]).build() # Mixing agent(s) and executor(s) is supported - wf2 = ConcurrentBuilder().register_participants([create_researcher, my_custom_executor]).build() + wf2 = ConcurrentBuilder().register_participants([create_researcher, MyCustomExecutor]).build() """ if self._participants: raise ValueError( @@ -441,6 +440,10 @@ def build(self) -> Workflow: if self._participant_factories: for factory in self._participant_factories: p = factory() + if not isinstance(p, (AgentProtocol, Executor)): + raise TypeError( + f"Participant factory must return AgentProtocol or Executor; got {type(p).__name__}" + ) participants.append(p) else: participants = self._participants diff --git a/python/packages/core/tests/workflow/test_concurrent.py b/python/packages/core/tests/workflow/test_concurrent.py index e30cd3162c..a35c3d5393 100644 --- a/python/packages/core/tests/workflow/test_concurrent.py +++ b/python/packages/core/tests/workflow/test_concurrent.py @@ -52,6 +52,20 @@ def test_concurrent_builder_rejects_duplicate_executors() -> None: ConcurrentBuilder().participants([a, b]) +def test_concurrent_builder_rejects_duplicate_executors_from_factories() -> None: + """Test that duplicate executor IDs from factories are detected at build time.""" + + def create_dup1() -> Executor: + return _FakeAgentExec("dup", "A") + + def create_dup2() -> Executor: + return _FakeAgentExec("dup", "B") # same executor id + + builder = ConcurrentBuilder().register_participants([create_dup1, create_dup2]) + with pytest.raises(ValueError, match="Duplicate"): + builder.build() + + async def test_concurrent_default_aggregator_emits_single_user_and_assistants() -> None: # Three synthetic agent executors e1 = _FakeAgentExec("agentA", "Alpha") @@ -232,7 +246,7 @@ async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowCon async def test_concurrent_with_aggregator_executor_factory_with_default_id() -> None: - """Test with_aggregator using an Executor factory.""" + """Test with_aggregator using an Executor class directly as factory (with default __init__ parameters).""" class CustomAggregator(Executor): def __init__(self, id: str = "default_aggregator") -> None: diff --git a/python/samples/getting_started/workflows/orchestration/concurrent_participant_factory.py b/python/samples/getting_started/workflows/orchestration/concurrent_participant_factory.py index ede448ce0b..8658e508d8 100644 --- a/python/samples/getting_started/workflows/orchestration/concurrent_participant_factory.py +++ b/python/samples/getting_started/workflows/orchestration/concurrent_participant_factory.py @@ -80,7 +80,7 @@ def create_legal() -> ChatAgent: class SummarizationExecutor(Executor): """Custom aggregator executor that synthesizes expert outputs into a concise summary.""" - def __init__(self): + def __init__(self) -> None: super().__init__(id="summarization_executor") self.chat_client = AzureOpenAIChatClient(credential=AzureCliCredential()) From 46453701f9b1e28b0220d4fd1abd0ef9f1f2729c Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Wed, 10 Dec 2025 10:59:49 -0800 Subject: [PATCH 04/12] Fix unit tests --- .../agent_framework/_workflows/_concurrent.py | 22 ++++++++++++------- .../core/tests/workflow/test_concurrent.py | 10 +++++++++ 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_concurrent.py b/python/packages/core/agent_framework/_workflows/_concurrent.py index 62a31c5011..794a2f3037 100644 --- a/python/packages/core/agent_framework/_workflows/_concurrent.py +++ b/python/packages/core/agent_framework/_workflows/_concurrent.py @@ -449,14 +449,20 @@ def build(self) -> Workflow: # Case 1: Executor instance - use directly aggregator = self._aggregator elif callable(self._aggregator): - # Distinguish between an aggregator factory and callback-based aggregator - # by checking the signature: factory has 0 params, callback has 1-2 params - sig = inspect.signature(self._aggregator) - param_count = len(sig.parameters) - - # Case 2: Executor factory (no parameters) - call it to create the executor - # Case 3: Callback with parameters (1-2 params) - wrap in _CallbackAggregator - aggregator = self._aggregator() if param_count == 0 else _CallbackAggregator(self._aggregator) # type: ignore + # Distinguish between an aggregator factory (could also be a class) and callback-based aggregator + if inspect.isclass(self._aggregator): + aggregator = self._aggregator() + else: + # Check the signature: factory has 0 params, callback has 1-2 params + sig = inspect.signature(self._aggregator) + param_count = len(sig.parameters) + + # Case 2: Executor factory (no parameters) - call it to create the executor + # Case 3: Callback with parameters (1-2 params) - wrap in _CallbackAggregator + aggregator = self._aggregator() if param_count == 0 else _CallbackAggregator(self._aggregator) # type: ignore + + if not isinstance(aggregator, Executor): + raise TypeError(f"Aggregator factory must return an Executor; got {type(aggregator).__name__}") else: # Case 4: No custom aggregator provided - use the default one aggregator = _AggregateAgentConversations(id="aggregator") diff --git a/python/packages/core/tests/workflow/test_concurrent.py b/python/packages/core/tests/workflow/test_concurrent.py index a35c3d5393..9da5d7ee6b 100644 --- a/python/packages/core/tests/workflow/test_concurrent.py +++ b/python/packages/core/tests/workflow/test_concurrent.py @@ -281,6 +281,16 @@ async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowCon assert output == "One | Two" +def test_concurrent_with_aggregator_executor_factory_fail_with_type_mismatch() -> None: + """Test with_aggregator using an Executor class directly as factory (with default __init__ parameters).""" + + e1 = _FakeAgentExec("agentA", "One") + e2 = _FakeAgentExec("agentB", "Two") + + with pytest.raises(TypeError): + ConcurrentBuilder().participants([e1, e2]).with_aggregator(lambda: "Mock Aggregator").build() # type: ignore + + async def test_concurrent_checkpoint_resume_round_trip() -> None: storage = InMemoryCheckpointStorage() From 4bb96b3f040c90927030094d40ecaeba8da28c0c Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Wed, 10 Dec 2025 11:15:29 -0800 Subject: [PATCH 05/12] Fix import --- python/packages/core/tests/workflow/test_concurrent.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/packages/core/tests/workflow/test_concurrent.py b/python/packages/core/tests/workflow/test_concurrent.py index 9da5d7ee6b..bc53f2f27b 100644 --- a/python/packages/core/tests/workflow/test_concurrent.py +++ b/python/packages/core/tests/workflow/test_concurrent.py @@ -1,8 +1,9 @@ # Copyright (c) Microsoft. All rights reserved. -from typing import Any, Never, cast +from typing import Any, cast import pytest +from typing_extensions import Never from agent_framework import ( AgentExecutorRequest, From 42b8e7c85fb95e9032e7b61a2b62ebeeba0793b5 Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Wed, 10 Dec 2025 14:10:20 -0800 Subject: [PATCH 06/12] Prevent multiple calls to set participants or factories --- .../packages/core/agent_framework/_workflows/_concurrent.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/python/packages/core/agent_framework/_workflows/_concurrent.py b/python/packages/core/agent_framework/_workflows/_concurrent.py index 794a2f3037..568515dacc 100644 --- a/python/packages/core/agent_framework/_workflows/_concurrent.py +++ b/python/packages/core/agent_framework/_workflows/_concurrent.py @@ -277,6 +277,9 @@ class MyCustomExecutor(Executor): ... "Cannot mix .participants([...]) and .register_participants() in the same builder instance." ) + if self._participant_factories: + raise ValueError("register_participants() has already been called on this builder instance.") + if not participant_factories: raise ValueError("participant_factories cannot be empty") @@ -308,6 +311,9 @@ def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "Con "Cannot mix .participants([...]) and .register_participants() in the same builder instance." ) + if self._participants: + raise ValueError("participants() has already been called on this builder instance.") + if not participants: raise ValueError("participants cannot be empty") From 14f74f096eca060cea1f3678da19a93903e46b2d Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Wed, 10 Dec 2025 14:15:30 -0800 Subject: [PATCH 07/12] Add comments --- .../packages/core/agent_framework/_workflows/_concurrent.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_concurrent.py b/python/packages/core/agent_framework/_workflows/_concurrent.py index 568515dacc..7bb3f5e91e 100644 --- a/python/packages/core/agent_framework/_workflows/_concurrent.py +++ b/python/packages/core/agent_framework/_workflows/_concurrent.py @@ -246,7 +246,8 @@ def register_participants( is wired as a parallel branch using fan-out edges from an internal dispatcher. Raises: - ValueError: if `participant_factories` is empty or `.participants()` was called + ValueError: if `participant_factories` is empty or `.participants()` + or `.register_participants()` were already called Example: @@ -294,7 +295,8 @@ def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "Con from an internal dispatcher. Raises: - ValueError: if `participants` is empty, contains duplicates, or `.register_participants()` was called + ValueError: if `participants` is empty, contains duplicates, or `.register_participants()` + or `.participants()` were already called TypeError: if any entry is not AgentProtocol or Executor Example: From ad67b4d0a6731d2bc15a0f7f8c86468f79073cc9 Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Wed, 10 Dec 2025 15:11:58 -0800 Subject: [PATCH 08/12] Mitigate warnings --- .../agent_framework/_workflows/_concurrent.py | 55 +++++++++++++++---- .../_workflows/_workflow_builder.py | 20 +++++-- .../core/tests/workflow/test_concurrent.py | 2 +- .../tests/workflow/test_workflow_builder.py | 14 +++++ 4 files changed, 73 insertions(+), 18 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_concurrent.py b/python/packages/core/agent_framework/_workflows/_concurrent.py index 7bb3f5e91e..db05fe7eb2 100644 --- a/python/packages/core/agent_framework/_workflows/_concurrent.py +++ b/python/packages/core/agent_framework/_workflows/_concurrent.py @@ -3,6 +3,7 @@ import asyncio import inspect import logging +import uuid from collections.abc import Callable, Sequence from typing import Any @@ -488,18 +489,50 @@ def build(self) -> Workflow: participants = self._participants builder = WorkflowBuilder() - builder.set_start_executor(dispatcher) - builder.add_fan_out_edges(dispatcher, list(participants)) - - if self._request_info_enabled: - # Insert interceptor between fan-in and aggregator - # participants -> fan-in -> interceptor -> aggregator - request_info_interceptor = RequestInfoInterceptor(executor_id="request_info") - builder.add_fan_in_edges(list(participants), request_info_interceptor) - builder.add_edge(request_info_interceptor, aggregator) + if self._participant_factories: + # Register executors/agents to avoid warnings from the workflow builder + # if factories are provided instead of direct instances. This doesn't + # break the factory pattern since the concurrent builder still creates + # new instances per workflow build. + factory_names: list[str] = [] + for p in participants: + factory_name = uuid.uuid4().hex + factory_names.append(factory_name) + if isinstance(p, Executor): + builder.register_executor(lambda p=p: p, name=factory_name) + else: + builder.register_agent(lambda p=p: p, name=factory_name) + # Register the dispatcher and the aggregator + builder.register_executor(lambda: dispatcher, name="dispatcher") + builder.register_executor(lambda: aggregator, name="aggregator") + + builder.set_start_executor("dispatcher") + builder.add_fan_out_edges("dispatcher", factory_names) + if self._request_info_enabled: + # Insert interceptor between fan-in and aggregator + # participants -> fan-in -> interceptor -> aggregator + builder.register_executor( + lambda: RequestInfoInterceptor(executor_id="request_info"), + name="request_info_interceptor", + ) + builder.add_fan_in_edges(factory_names, "request_info_interceptor") + builder.add_edge("request_info_interceptor", "aggregator") + else: + # Direct fan-in to aggregator + builder.add_fan_in_edges(factory_names, "aggregator") else: - # Direct fan-in to aggregator - builder.add_fan_in_edges(list(participants), aggregator) + builder.set_start_executor(dispatcher) + builder.add_fan_out_edges(dispatcher, list(participants)) + + if self._request_info_enabled: + # Insert interceptor between fan-in and aggregator + # participants -> fan-in -> interceptor -> aggregator + request_info_interceptor = RequestInfoInterceptor(executor_id="request_info") + builder.add_fan_in_edges(list(participants), request_info_interceptor) + builder.add_edge(request_info_interceptor, aggregator) + else: + # Direct fan-in to aggregator + builder.add_fan_in_edges(list(participants), aggregator) if self._checkpoint_storage is not None: builder = builder.with_checkpointing(self._checkpoint_storage) diff --git a/python/packages/core/agent_framework/_workflows/_workflow_builder.py b/python/packages/core/agent_framework/_workflows/_workflow_builder.py index 26cd0213e4..9a2eab4a8a 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow_builder.py +++ b/python/packages/core/agent_framework/_workflows/_workflow_builder.py @@ -1148,21 +1148,29 @@ def _resolve_edge_registry(self) -> tuple[Executor, list[Executor], list[EdgeGro if isinstance(self._start_executor, Executor): start_executor = self._start_executor - executors: dict[str, Executor] = {} + # Maps registered factory names to created executor instances for edge resolution + factory_name_to_instance: dict[str, Executor] = {} + # Maps executor IDs to created executor instances to prevent duplicates + executor_id_to_instance: dict[str, Executor] = {} deferred_edge_groups: list[EdgeGroup] = [] for name, exec_factory in self._executor_registry.items(): instance = exec_factory() + if instance.id in executor_id_to_instance: + raise ValueError(f"Executor with ID '{instance.id}' has already been registered.") + executor_id_to_instance[instance.id] = instance + if isinstance(self._start_executor, str) and name == self._start_executor: start_executor = instance + # All executors will get their own internal edge group for receiving system messages deferred_edge_groups.append(InternalEdgeGroup(instance.id)) # type: ignore[call-arg] - executors[name] = instance + factory_name_to_instance[name] = instance def _get_executor(name: str) -> Executor: """Helper to get executor by the registered name. Raises if not found.""" - if name not in executors: - raise ValueError(f"Executor with name '{name}' has not been registered.") - return executors[name] + if name not in factory_name_to_instance: + raise ValueError(f"Executor with factory name '{name}' has not been registered.") + return factory_name_to_instance[name] for registration in self._edge_registry: match registration: @@ -1201,7 +1209,7 @@ def _get_executor(name: str) -> Executor: if start_executor is None: raise ValueError("Failed to resolve starting executor from registered factories.") - return start_executor, list(executors.values()), deferred_edge_groups + return start_executor, list(executor_id_to_instance.values()), deferred_edge_groups def build(self) -> Workflow: """Build and return the constructed workflow. diff --git a/python/packages/core/tests/workflow/test_concurrent.py b/python/packages/core/tests/workflow/test_concurrent.py index bc53f2f27b..194ca960d2 100644 --- a/python/packages/core/tests/workflow/test_concurrent.py +++ b/python/packages/core/tests/workflow/test_concurrent.py @@ -63,7 +63,7 @@ def create_dup2() -> Executor: return _FakeAgentExec("dup", "B") # same executor id builder = ConcurrentBuilder().register_participants([create_dup1, create_dup2]) - with pytest.raises(ValueError, match="Duplicate"): + with pytest.raises(ValueError, match="Executor with ID 'dup' has already been registered."): builder.build() diff --git a/python/packages/core/tests/workflow/test_workflow_builder.py b/python/packages/core/tests/workflow/test_workflow_builder.py index a037bf51b6..9dcfab9487 100644 --- a/python/packages/core/tests/workflow/test_workflow_builder.py +++ b/python/packages/core/tests/workflow/test_workflow_builder.py @@ -293,6 +293,20 @@ def test_register_duplicate_name_raises_error(): builder.register_executor(lambda: MockExecutor(id="executor_2"), name="MyExecutor") +def test_register_duplicate_id_raises_error(): + """Test that registering duplicate names raises an error.""" + builder = WorkflowBuilder() + + # Register first executor + builder.register_executor(lambda: MockExecutor(id="executor"), name="MyExecutor1") + builder.register_executor(lambda: MockExecutor(id="executor"), name="MyExecutor2") + builder.set_start_executor("MyExecutor1") + + # Registering second executor with same name should raise ValueError + with pytest.raises(ValueError, match="Executor with ID 'executor' has already been registered."): + builder.build() + + def test_register_agent_basic(): """Test basic agent registration with lazy initialization.""" builder = WorkflowBuilder() From 20cb9364b6fedc056aef290f0326fe72c5f9f848 Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Wed, 10 Dec 2025 15:58:54 -0800 Subject: [PATCH 09/12] Fix mypy --- .../packages/core/agent_framework/_workflows/_concurrent.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_concurrent.py b/python/packages/core/agent_framework/_workflows/_concurrent.py index db05fe7eb2..c45f8d38b3 100644 --- a/python/packages/core/agent_framework/_workflows/_concurrent.py +++ b/python/packages/core/agent_framework/_workflows/_concurrent.py @@ -499,9 +499,9 @@ def build(self) -> Workflow: factory_name = uuid.uuid4().hex factory_names.append(factory_name) if isinstance(p, Executor): - builder.register_executor(lambda p=p: p, name=factory_name) + builder.register_executor(lambda executor=p: executor, name=factory_name) # type: ignore[misc] else: - builder.register_agent(lambda p=p: p, name=factory_name) + builder.register_agent(lambda agent=p: agent, name=factory_name) # type: ignore[misc] # Register the dispatcher and the aggregator builder.register_executor(lambda: dispatcher, name="dispatcher") builder.register_executor(lambda: aggregator, name="aggregator") From 6fa7d4ab4da481b3dd02bc870523d3be073442f9 Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Wed, 10 Dec 2025 16:53:23 -0800 Subject: [PATCH 10/12] Address comments --- .../agent_framework/_workflows/_concurrent.py | 157 +++++++++++------- .../core/tests/workflow/test_concurrent.py | 111 ++++++++++++- .../concurrent_participant_factory.py | 7 +- 3 files changed, 201 insertions(+), 74 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_concurrent.py b/python/packages/core/agent_framework/_workflows/_concurrent.py index c45f8d38b3..c84ab2d5df 100644 --- a/python/packages/core/agent_framework/_workflows/_concurrent.py +++ b/python/packages/core/agent_framework/_workflows/_concurrent.py @@ -194,6 +194,7 @@ class ConcurrentBuilder: or Executor factories - `build()` wires: dispatcher -> fan-out -> participants -> fan-in -> aggregator. - `with_aggregator(...)` overrides the default aggregator with an Executor or callback. + - `register_aggregator(...)` accepts a factory for an Executor as custom aggregator. Usage: @@ -216,6 +217,22 @@ def summarize(results: list[AgentExecutorResponse]) -> str: workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_aggregator(summarize).build() + + # Custom aggregator via a factory + class MyAggregator(Executor): + @handler + async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, str]) -> None: + await ctx.yield_output(" | ".join(r.agent_run_response.messages[-1].text for r in results)) + + + workflow = ( + ConcurrentBuilder() + .register_participants([create_agent1, create_agent2, create_agent3]) + .register_aggregator(lambda: MyAggregator(id="my_aggregator")) + .build() + ) + + # Enable checkpoint persistence so runs can resume workflow = ConcurrentBuilder().participants([agent1, agent2, agent3]).with_checkpointing(storage).build() @@ -226,13 +243,8 @@ def summarize(results: list[AgentExecutorResponse]) -> str: def __init__(self) -> None: self._participants: list[AgentProtocol | Executor] = [] self._participant_factories: list[Callable[[], AgentProtocol | Executor]] = [] - self._aggregator: ( - Executor - | Callable[[], Executor] - | Callable[[list[AgentExecutorResponse]], Any] - | Callable[[list[AgentExecutorResponse], WorkflowContext[Never, Any]], Any] - | None - ) = None + self._aggregator: Executor | None = None + self._aggregator_factory: Callable[[], Executor] | None = None self._checkpoint_storage: CheckpointStorage | None = None self._request_info_enabled: bool = False @@ -246,6 +258,9 @@ def register_participants( by a chat client) or Executor instances. Each participant created by a factory is wired as a parallel branch using fan-out edges from an internal dispatcher. + Args: + participant_factories: Sequence of callables returning AgentProtocol or Executor instances + Raises: ValueError: if `participant_factories` is empty or `.participants()` or `.register_participants()` were already called @@ -295,6 +310,9 @@ def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "Con instances. Each participant is wired as a parallel branch using fan-out edges from an internal dispatcher. + Args: + participants: Sequence of AgentProtocol or Executor instances + Raises: ValueError: if `participants` is empty, contains duplicates, or `.register_participants()` or `.participants()` were already called @@ -339,23 +357,58 @@ def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "Con self._participants = list(participants) return self + def register_aggregator(self, aggregator_factory: Callable[[], Executor]) -> "ConcurrentBuilder": + r"""Define a custom aggregator for this concurrent workflow. + + Accepts a factory (callable) that returns an Executor instance. The executor + should handle `list[AgentExecutorResponse]` and yield output using `ctx.yield_output(...)`. + + Args: + aggregator_factory: Callable that returns an Executor instance + + Example: + .. code-block:: python + + @executor(id="custom_aggregator") + async def custom_aggregator(results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, str]) -> None: + await ctx.yield_output(" | ".join(r.agent_run_response.messages[-1].text for r in results)) + + + wf = ( + ConcurrentBuilder() + .register_participants([create_researcher, create_marketer, create_legal]) + .register_aggregator(lambda: custom_aggregator) + .build() + ) + """ + if self._aggregator is not None: + raise ValueError( + "Cannot mix .with_aggregator(...) and .register_aggregator(...) in the same builder instance." + ) + + if self._aggregator_factory is not None: + raise ValueError("register_aggregator() has already been called on this builder instance.") + + self._aggregator_factory = aggregator_factory + return self + def with_aggregator( self, aggregator: Executor - | Callable[[], Executor] | Callable[[list[AgentExecutorResponse]], Any] | Callable[[list[AgentExecutorResponse], WorkflowContext[Never, Any]], Any], ) -> "ConcurrentBuilder": r"""Override the default aggregator with an executor, an executor factory, or a callback. - Executor: must handle `list[AgentExecutorResponse]` and yield output using `ctx.yield_output(...)` - - Executor factory: callable returning an Executor instance that handles `list[AgentExecutorResponse]` - and yields output using `ctx.yield_output(...)` - Callback: sync or async callable with one of the signatures: `(results: list[AgentExecutorResponse]) -> Any | None` or `(results: list[AgentExecutorResponse], ctx: WorkflowContext) -> Any | None`. If the callback returns a non-None value, it becomes the workflow's output. + Args: + aggregator: Executor instance, or callback function + Example: .. code-block:: python @@ -368,14 +421,6 @@ async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowCon wf = ConcurrentBuilder().participants([a1, a2, a3]).with_aggregator(CustomAggregator()).build() - # Factory-based aggregator - wf = ( - ConcurrentBuilder() - .participants([a1, a2, a3]) - .with_aggregator(lambda: CustomAggregator(id="custom_aggregator")) - .build() - ) - # Callback-based aggregator (string result) async def summarize(results: list[AgentExecutorResponse]) -> str: @@ -392,15 +437,29 @@ async def summarize(results: list[AgentExecutorResponse], ctx: WorkflowContext[N wf = ConcurrentBuilder().participants([a1, a2, a3]).with_aggregator(summarize).build() """ - if isinstance(aggregator, Executor) or callable(aggregator): + if self._aggregator_factory is not None: + raise ValueError( + "Cannot mix .with_aggregator(...) and .register_aggregator(...) in the same builder instance." + ) + + if self._aggregator is not None: + raise ValueError("with_aggregator() has already been called on this builder instance.") + + if isinstance(aggregator, Executor): self._aggregator = aggregator + elif callable(aggregator): + self._aggregator = _CallbackAggregator(aggregator) else: raise TypeError("aggregator must be an Executor or a callable") return self def with_checkpointing(self, checkpoint_storage: CheckpointStorage) -> "ConcurrentBuilder": - """Enable checkpoint persistence using the provided storage backend.""" + """Enable checkpoint persistence using the provided storage backend. + + Args: + checkpoint_storage: CheckpointStorage instance for persisting workflow state + """ self._checkpoint_storage = checkpoint_storage return self @@ -454,39 +513,15 @@ def build(self) -> Workflow: # Internal nodes dispatcher = _DispatchToAllParticipants(id="dispatcher") - if isinstance(self._aggregator, Executor): - # Case 1: Executor instance - use directly - aggregator = self._aggregator - elif callable(self._aggregator): - # Distinguish between an aggregator factory (could also be a class) and callback-based aggregator - if inspect.isclass(self._aggregator): - aggregator = self._aggregator() - else: - # Check the signature: factory has 0 params, callback has 1-2 params - sig = inspect.signature(self._aggregator) - param_count = len(sig.parameters) - - # Case 2: Executor factory (no parameters) - call it to create the executor - # Case 3: Callback with parameters (1-2 params) - wrap in _CallbackAggregator - aggregator = self._aggregator() if param_count == 0 else _CallbackAggregator(self._aggregator) # type: ignore - - if not isinstance(aggregator, Executor): - raise TypeError(f"Aggregator factory must return an Executor; got {type(aggregator).__name__}") - else: - # Case 4: No custom aggregator provided - use the default one - aggregator = _AggregateAgentConversations(id="aggregator") - - participants: list[Executor | AgentProtocol] = [] - if self._participant_factories: - for factory in self._participant_factories: - p = factory() - if not isinstance(p, (AgentProtocol, Executor)): - raise TypeError( - f"Participant factory must return AgentProtocol or Executor; got {type(p).__name__}" - ) - participants.append(p) - else: - participants = self._participants + aggregator = ( + self._aggregator + if self._aggregator is not None + else ( + self._aggregator_factory() + if self._aggregator_factory is not None + else _AggregateAgentConversations(id="aggregator") + ) + ) builder = WorkflowBuilder() if self._participant_factories: @@ -495,13 +530,14 @@ def build(self) -> Workflow: # break the factory pattern since the concurrent builder still creates # new instances per workflow build. factory_names: list[str] = [] - for p in participants: + for factory in self._participant_factories: factory_name = uuid.uuid4().hex factory_names.append(factory_name) - if isinstance(p, Executor): - builder.register_executor(lambda executor=p: executor, name=factory_name) # type: ignore[misc] + instance = factory() + if isinstance(instance, Executor): + builder.register_executor(lambda executor=instance: executor, name=factory_name) # type: ignore[misc] else: - builder.register_agent(lambda agent=p: agent, name=factory_name) # type: ignore[misc] + builder.register_agent(lambda agent=instance: agent, name=factory_name) # type: ignore[misc] # Register the dispatcher and the aggregator builder.register_executor(lambda: dispatcher, name="dispatcher") builder.register_executor(lambda: aggregator, name="aggregator") @@ -522,18 +558,17 @@ def build(self) -> Workflow: builder.add_fan_in_edges(factory_names, "aggregator") else: builder.set_start_executor(dispatcher) - builder.add_fan_out_edges(dispatcher, list(participants)) + builder.add_fan_out_edges(dispatcher, self._participants) if self._request_info_enabled: # Insert interceptor between fan-in and aggregator # participants -> fan-in -> interceptor -> aggregator request_info_interceptor = RequestInfoInterceptor(executor_id="request_info") - builder.add_fan_in_edges(list(participants), request_info_interceptor) + builder.add_fan_in_edges(self._participants, request_info_interceptor) builder.add_edge(request_info_interceptor, aggregator) else: # Direct fan-in to aggregator - builder.add_fan_in_edges(list(participants), aggregator) - + builder.add_fan_in_edges(self._participants, aggregator) if self._checkpoint_storage is not None: builder = builder.with_checkpointing(self._checkpoint_storage) diff --git a/python/packages/core/tests/workflow/test_concurrent.py b/python/packages/core/tests/workflow/test_concurrent.py index 194ca960d2..fe572b8f48 100644 --- a/python/packages/core/tests/workflow/test_concurrent.py +++ b/python/packages/core/tests/workflow/test_concurrent.py @@ -67,6 +67,41 @@ def create_dup2() -> Executor: builder.build() +def test_concurrent_builder_rejects_mixed_participants_and_factories() -> None: + """Test that mixing .participants() and .register_participants() raises an error.""" + # Case 1: participants first, then register_participants + with pytest.raises(ValueError, match="Cannot mix .participants"): + ( + ConcurrentBuilder() + .participants([_FakeAgentExec("a", "A")]) + .register_participants([lambda: _FakeAgentExec("b", "B")]) + ) + + # Case 2: register_participants first, then participants + with pytest.raises(ValueError, match="Cannot mix .participants"): + ( + ConcurrentBuilder() + .register_participants([lambda: _FakeAgentExec("a", "A")]) + .participants([_FakeAgentExec("b", "B")]) + ) + + +def test_concurrent_builder_rejects_multiple_calls_to_participants() -> None: + """Test that multiple calls to .participants() raises an error.""" + with pytest.raises(ValueError, match=r"participants\(\) has already been called"): + (ConcurrentBuilder().participants([_FakeAgentExec("a", "A")]).participants([_FakeAgentExec("b", "B")])) + + +def test_concurrent_builder_rejects_multiple_calls_to_register_participants() -> None: + """Test that multiple calls to .register_participants() raises an error.""" + with pytest.raises(ValueError, match=r"register_participants\(\) has already been called"): + ( + ConcurrentBuilder() + .register_participants([lambda: _FakeAgentExec("a", "A")]) + .register_participants([lambda: _FakeAgentExec("b", "B")]) + ) + + async def test_concurrent_default_aggregator_emits_single_user_and_assistants() -> None: # Three synthetic agent executors e1 = _FakeAgentExec("agentA", "Alpha") @@ -226,7 +261,7 @@ async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowCon wf = ( ConcurrentBuilder() .participants([e1, e2]) - .with_aggregator(lambda: CustomAggregator(id="custom_aggregator")) + .register_aggregator(lambda: CustomAggregator(id="custom_aggregator")) .build() ) @@ -264,7 +299,7 @@ async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowCon e1 = _FakeAgentExec("agentA", "One") e2 = _FakeAgentExec("agentB", "Two") - wf = ConcurrentBuilder().participants([e1, e2]).with_aggregator(CustomAggregator).build() + wf = ConcurrentBuilder().participants([e1, e2]).register_aggregator(CustomAggregator).build() completed = False output: str | None = None @@ -282,14 +317,28 @@ async def aggregate(self, results: list[AgentExecutorResponse], ctx: WorkflowCon assert output == "One | Two" -def test_concurrent_with_aggregator_executor_factory_fail_with_type_mismatch() -> None: - """Test with_aggregator using an Executor class directly as factory (with default __init__ parameters).""" +def test_concurrent_builder_rejects_multiple_calls_to_with_aggregator() -> None: + """Test that multiple calls to .with_aggregator() raises an error.""" - e1 = _FakeAgentExec("agentA", "One") - e2 = _FakeAgentExec("agentB", "Two") + def summarize(results: list[AgentExecutorResponse]) -> str: # type: ignore[override] + return str(len(results)) + + with pytest.raises(ValueError, match=r"with_aggregator\(\) has already been called"): + (ConcurrentBuilder().with_aggregator(summarize).with_aggregator(summarize)) + + +def test_concurrent_builder_rejects_multiple_calls_to_register_aggregator() -> None: + """Test that multiple calls to .register_aggregator() raises an error.""" - with pytest.raises(TypeError): - ConcurrentBuilder().participants([e1, e2]).with_aggregator(lambda: "Mock Aggregator").build() # type: ignore + class CustomAggregator(Executor): + pass + + with pytest.raises(ValueError, match=r"register_aggregator\(\) has already been called"): + ( + ConcurrentBuilder() + .register_aggregator(lambda: CustomAggregator(id="agg1")) + .register_aggregator(lambda: CustomAggregator(id="agg2")) + ) async def test_concurrent_checkpoint_resume_round_trip() -> None: @@ -437,6 +486,52 @@ def test_concurrent_builder_rejects_mixing_participants_and_factories() -> None: ) +async def test_concurrent_builder_reusable_after_build_with_participants() -> None: + """Test that the builder can be reused to build multiple identical workflows with participants().""" + e1 = _FakeAgentExec("agentA", "One") + e2 = _FakeAgentExec("agentB", "Two") + + builder = ConcurrentBuilder().participants([e1, e2]) + + builder.build() + + assert builder._participants[0] is e1 # type: ignore + assert builder._participants[1] is e2 # type: ignore + assert builder._participant_factories == [] # type: ignore + + +async def test_concurrent_builder_reusable_after_build_with_factories() -> None: + """Test that the builder can be reused to build multiple workflows with register_participants().""" + call_count = 0 + + def create_agent_executor_a() -> Executor: + nonlocal call_count + call_count += 1 + return _FakeAgentExec("agentA", "One") + + def create_agent_executor_b() -> Executor: + nonlocal call_count + call_count += 1 + return _FakeAgentExec("agentB", "Two") + + builder = ConcurrentBuilder().register_participants([create_agent_executor_a, create_agent_executor_b]) + + # Build the first workflow + wf1 = builder.build() + + assert builder._participants == [] # type: ignore + assert len(builder._participant_factories) == 2 # type: ignore + assert call_count == 2 + + # Build the second workflow + wf2 = builder.build() + assert call_count == 4 + + # Verify that the two workflows have different executor instances + assert wf1.executors["agentA"] is not wf2.executors["agentA"] + assert wf1.executors["agentB"] is not wf2.executors["agentB"] + + async def test_concurrent_with_register_participants() -> None: """Test workflow creation using register_participants with factories.""" diff --git a/python/samples/getting_started/workflows/orchestration/concurrent_participant_factory.py b/python/samples/getting_started/workflows/orchestration/concurrent_participant_factory.py index 8658e508d8..435e59b2ba 100644 --- a/python/samples/getting_started/workflows/orchestration/concurrent_participant_factory.py +++ b/python/samples/getting_started/workflows/orchestration/concurrent_participant_factory.py @@ -124,14 +124,11 @@ async def main() -> None: # Create a concurrent builder with participant factories and a custom aggregator # - register_participants([...]) accepts factory functions that return # AgentProtocol (agents) or Executor instances. - # - with_aggregator(...) overrides the default aggregator: - # • Default aggregator -> returns list[ChatMessage] (one user + one assistant per agent) - # • Custom callback -> return value becomes workflow output (string here) - # • Custom Executor -> can yield outputs via ctx.yield_output(...) + # - register_aggregator(...) takes a factory function that returns an Executor instance. concurrent_builder = ( ConcurrentBuilder() .register_participants([create_researcher, create_marketer, create_legal]) - .with_aggregator(SummarizationExecutor) + .register_aggregator(SummarizationExecutor) ) # Build workflow_a From e1286840c17b1ff6f8ef3858c5363bfe783c3c18 Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Wed, 10 Dec 2025 17:10:11 -0800 Subject: [PATCH 11/12] Address Copilot comments --- .../agent_framework/_workflows/_concurrent.py | 6 ++---- .../_workflows/_workflow_builder.py | 8 ++++---- .../core/tests/workflow/test_concurrent.py | 19 ------------------- .../tests/workflow/test_workflow_builder.py | 4 ++-- 4 files changed, 8 insertions(+), 29 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_concurrent.py b/python/packages/core/agent_framework/_workflows/_concurrent.py index c84ab2d5df..a6fcaa1a3e 100644 --- a/python/packages/core/agent_framework/_workflows/_concurrent.py +++ b/python/packages/core/agent_framework/_workflows/_concurrent.py @@ -369,15 +369,13 @@ def register_aggregator(self, aggregator_factory: Callable[[], Executor]) -> "Co Example: .. code-block:: python - @executor(id="custom_aggregator") - async def custom_aggregator(results: list[AgentExecutorResponse], ctx: WorkflowContext[Never, str]) -> None: - await ctx.yield_output(" | ".join(r.agent_run_response.messages[-1].text for r in results)) + class MyCustomExecutor(Executor): ... wf = ( ConcurrentBuilder() .register_participants([create_researcher, create_marketer, create_legal]) - .register_aggregator(lambda: custom_aggregator) + .register_aggregator(lambda: MyCustomExecutor(id="my_aggregator")) .build() ) """ diff --git a/python/packages/core/agent_framework/_workflows/_workflow_builder.py b/python/packages/core/agent_framework/_workflows/_workflow_builder.py index 9a2eab4a8a..5bf36b6ccd 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow_builder.py +++ b/python/packages/core/agent_framework/_workflows/_workflow_builder.py @@ -374,7 +374,7 @@ def register_agent( ) """ if name in self._executor_registry: - raise ValueError(f"An executor factory with the name '{name}' is already registered.") + raise ValueError(f"An agent factory with the name '{name}' is already registered.") def wrapped_factory() -> AgentExecutor: agent = factory_func() @@ -1156,7 +1156,7 @@ def _resolve_edge_registry(self) -> tuple[Executor, list[Executor], list[EdgeGro for name, exec_factory in self._executor_registry.items(): instance = exec_factory() if instance.id in executor_id_to_instance: - raise ValueError(f"Executor with ID '{instance.id}' has already been registered.") + raise ValueError(f"Executor with ID '{instance.id}' has already been created.") executor_id_to_instance[instance.id] = instance if isinstance(self._start_executor, str) and name == self._start_executor: @@ -1169,7 +1169,7 @@ def _resolve_edge_registry(self) -> tuple[Executor, list[Executor], list[EdgeGro def _get_executor(name: str) -> Executor: """Helper to get executor by the registered name. Raises if not found.""" if name not in factory_name_to_instance: - raise ValueError(f"Executor with factory name '{name}' has not been registered.") + raise ValueError(f"Factory '{name}' has not been registered.") return factory_name_to_instance[name] for registration in self._edge_registry: @@ -1187,7 +1187,7 @@ def _get_executor(name: str) -> Executor: cases_converted: list[SwitchCaseEdgeGroupCase | SwitchCaseEdgeGroupDefault] = [] for case in cases: if not isinstance(case.target, str): - raise ValueError("Switch case target must be a registered executor name (str) if deferred.") + raise ValueError("Switch case target must be a registered factory name (str) if deferred.") target_exec = _get_executor(case.target) if isinstance(case, Default): cases_converted.append(SwitchCaseEdgeGroupDefault(target_id=target_exec.id)) diff --git a/python/packages/core/tests/workflow/test_concurrent.py b/python/packages/core/tests/workflow/test_concurrent.py index fe572b8f48..e1ff18969a 100644 --- a/python/packages/core/tests/workflow/test_concurrent.py +++ b/python/packages/core/tests/workflow/test_concurrent.py @@ -467,25 +467,6 @@ def test_concurrent_builder_rejects_empty_participant_factories() -> None: ConcurrentBuilder().register_participants([]) -def test_concurrent_builder_rejects_mixing_participants_and_factories() -> None: - """Test that mixing .participants() and .register_participants() raises an error.""" - # Case 1: participants first, then register_participants - with pytest.raises(ValueError, match="Cannot mix .participants"): - ( - ConcurrentBuilder() - .participants([_FakeAgentExec("a", "A")]) - .register_participants([lambda: _FakeAgentExec("b", "B")]) - ) - - # Case 2: register_participants first, then participants - with pytest.raises(ValueError, match="Cannot mix .participants"): - ( - ConcurrentBuilder() - .register_participants([lambda: _FakeAgentExec("a", "A")]) - .participants([_FakeAgentExec("b", "B")]) - ) - - async def test_concurrent_builder_reusable_after_build_with_participants() -> None: """Test that the builder can be reused to build multiple identical workflows with participants().""" e1 = _FakeAgentExec("agentA", "One") diff --git a/python/packages/core/tests/workflow/test_workflow_builder.py b/python/packages/core/tests/workflow/test_workflow_builder.py index 9dcfab9487..0b26837fbb 100644 --- a/python/packages/core/tests/workflow/test_workflow_builder.py +++ b/python/packages/core/tests/workflow/test_workflow_builder.py @@ -294,7 +294,7 @@ def test_register_duplicate_name_raises_error(): def test_register_duplicate_id_raises_error(): - """Test that registering duplicate names raises an error.""" + """Test that registering duplicate id raises an error.""" builder = WorkflowBuilder() # Register first executor @@ -302,7 +302,7 @@ def test_register_duplicate_id_raises_error(): builder.register_executor(lambda: MockExecutor(id="executor"), name="MyExecutor2") builder.set_start_executor("MyExecutor1") - # Registering second executor with same name should raise ValueError + # Registering second executor with same ID should raise ValueError with pytest.raises(ValueError, match="Executor with ID 'executor' has already been registered."): builder.build() From 8d11dccbfedf1c4a7912319cd1f0b6186410de54 Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Wed, 10 Dec 2025 17:16:44 -0800 Subject: [PATCH 12/12] Fix tests --- python/packages/core/tests/workflow/test_concurrent.py | 2 +- python/packages/core/tests/workflow/test_workflow_builder.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/packages/core/tests/workflow/test_concurrent.py b/python/packages/core/tests/workflow/test_concurrent.py index e1ff18969a..66cc8cfc68 100644 --- a/python/packages/core/tests/workflow/test_concurrent.py +++ b/python/packages/core/tests/workflow/test_concurrent.py @@ -63,7 +63,7 @@ def create_dup2() -> Executor: return _FakeAgentExec("dup", "B") # same executor id builder = ConcurrentBuilder().register_participants([create_dup1, create_dup2]) - with pytest.raises(ValueError, match="Executor with ID 'dup' has already been registered."): + with pytest.raises(ValueError, match="Executor with ID 'dup' has already been created."): builder.build() diff --git a/python/packages/core/tests/workflow/test_workflow_builder.py b/python/packages/core/tests/workflow/test_workflow_builder.py index 0b26837fbb..b281edee34 100644 --- a/python/packages/core/tests/workflow/test_workflow_builder.py +++ b/python/packages/core/tests/workflow/test_workflow_builder.py @@ -303,7 +303,7 @@ def test_register_duplicate_id_raises_error(): builder.set_start_executor("MyExecutor1") # Registering second executor with same ID should raise ValueError - with pytest.raises(ValueError, match="Executor with ID 'executor' has already been registered."): + with pytest.raises(ValueError, match="Executor with ID 'executor' has already been created."): builder.build()