Skip to content

Python: [Python] The event.data of ExecutorInvokedEvent contains incorrect input data for custom start executor #2929

@a1exwang

Description

@a1exwang

I followed this sample code to get the executor input & output. https://github.com/microsoft/agent-framework/blob/main/python/samples/getting_started/workflows/observability/executor_io_observation.py

Repro steps:

  1. Run the code sample
  2. The terminal output is something like
Running workflow with executor I/O observation...

[INVOKED] Writer
    Input: list: [ChatMessage(role='user', text='hi'), ChatMessage(role='assistant', text='Hello! How can I assist you today?')]
[COMPLETED] Writer
    Output: list: [list: [ChatMessage(role='user', text='hi'), ChatMessage(role='assistant', text='Hello! How can I assist you today?')]]
  1. The problem is, the input contains both both "hi" and 'Hello! How can I assist you today?'. But the 'Hello! How can I assist you today?' part should be output

Code sample:

import asyncio
from typing import Any, cast

from agent_framework import (
    WorkflowBuilder,
    WorkflowContext, ChatMessage,
    executor, ExecutorInvokedEvent, ExecutorCompletedEvent,
    WorkflowOutputEvent
)
from agent_framework.azure import AzureAIAgentClient
from azure.identity.aio import DefaultAzureCredential
from dotenv import load_dotenv


load_dotenv()

def build_workflow(chat_client):
    writer_agent = chat_client.create_agent(
        name="writer",
        instructions=(
            "You are an excellent content writer. "
            "Create clear, engaging content based on the user's request. "
            "Focus on clarity, accuracy, and proper structure."
        ),
    )
    @executor(id="Writer")
    async def writer(messages: list[ChatMessage], ctx: WorkflowContext[list[ChatMessage]]) -> None:
        response = await writer_agent.run(messages)
        messages.extend(response.messages)
        await ctx.send_message(messages)

    workflow = (
        WorkflowBuilder(
            name="Content Review Workflow",
            description="Multi-agent content creation workflow with quality-based routing (Writer → Reviewer → Editor/Publisher)",
        )
        .set_start_executor(writer)
        .build()
    )
    
    return workflow

def format_io_data(data: Any) -> str:
    """Format executor I/O data for display.

    This helper formats common data types for readable output.
    Customize based on the types used in your workflow.
    """
    type_name = type(data).__name__

    if data is None:
        return "None"
    if isinstance(data, str):
        preview = data[:80] + "..." if len(data) > 80 else data
        return f"{type_name}: '{preview}'"
    if isinstance(data, list):
        data_list = cast(list[Any], data)
        if len(data_list) == 0:
            return f"{type_name}: []"
        # For sent_messages, show each item with its type
        if len(data_list) <= 3:
            items = [format_io_data(item) for item in data_list]
            return f"{type_name}: [{', '.join(items)}]"
        return f"{type_name}: [{len(data_list)} items]"
    if isinstance(data, ChatMessage):
        role_str = data.role.value if hasattr(data.role, 'value') else str(data.role)
        text_preview = data.text[:80] + "..." if len(data.text) > 80 else data.text
        return f"ChatMessage(role='{role_str}', text='{text_preview}')"
    return f"{type_name}: {repr(data)}"



async def main() -> None:
    async with DefaultAzureCredential() as credential:
        async with AzureAIAgentClient(credential=credential) as chat_client:
            workflow = build_workflow(chat_client)
            print("Running workflow with executor I/O observation...\n")

            async for event in workflow.run_stream([ChatMessage(role="user", text="hi")]):
                if isinstance(event, ExecutorInvokedEvent):
                    # The input message received by the executor is in event.data
                    print(f"[INVOKED] {event.executor_id}")
                    print(f"    Input: {format_io_data(event.data)}")

                elif isinstance(event, ExecutorCompletedEvent):
                    # Messages sent via ctx.send_message() are in event.data
                    print(f"[COMPLETED] {event.executor_id}")
                    if event.data:
                        print(f"    Output: {format_io_data(event.data)}")

                elif isinstance(event, WorkflowOutputEvent):
                    print(f"[WORKFLOW OUTPUT] {format_io_data(event.data)}")

if __name__ == "__main__":
    asyncio.run(main())

Metadata

Metadata

Assignees

Labels

pythonv1.0Features being tracked for the version 1.0 GAworkflowsRelated to Workflows in agent-framework

Type

No type

Projects

Status

Done

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions