Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion docs/durable_execution/temporal.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,22 @@ The event stream handler function will receive the agent [run context][pydantic_
As the streaming model request activity, workflow, and workflow execution call all take place in separate processes, passing data between them requires some care:

- To get data from the workflow call site or workflow to the event stream handler, you can use a [dependencies object](#agent-run-context-and-dependencies).
- To get data from the event stream handler to the workflow, workflow call site, or a frontend, you need to use an external system that the event stream handler can write to and the event consumer can read from, like a message queue. You can use the dependency object to make sure the same connection string or other unique ID is available in all the places that need it.
- To get data from the event stream handler to the workflow, workflow call site, or a frontend, you need to use an external system that the event stream handler can write to and the event consumer can read from. Alternatively, you can use Temporal's built-in signals and queries to pass events from activities to the workflow and from the workflow to the caller.

#### Example

For a complete working example of streaming with Temporal using signals and queries, see the [temporal_streaming example](https://github.com/pydantic/pydantic-ai/tree/main/examples/pydantic_ai_examples/temporal_streaming). This example demonstrates:

- How to use an [`event_stream_handler`][pydantic_ai.agent.EventStreamHandler] to capture agent events in activities
- Using Temporal signals to send events from activities to the workflow
- Using Temporal queries to poll events from the workflow to the caller
- Setting up dependencies to pass workflow identification for signal routing
- Integrating MCP toolsets and custom tools with streaming
- Complete project structure with all necessary files

The example includes a Yahoo Finance search agent with Python code execution capabilities, showing how to stream tool calls, model responses, and results in real-time during workflow execution.



## Activity Configuration

Expand Down
172 changes: 172 additions & 0 deletions examples/pydantic_ai_examples/temporal_streaming/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
# Temporal Streaming Example

This example demonstrates how to implement streaming with Pydantic AI agents in Temporal workflows. It showcases the streaming pattern described in the [Temporal documentation](../../../docs/durable_execution/temporal.md#streaming).

## Overview

The example implements a Yahoo Finance search agent that:
- Uses MCP (Model Context Protocol) toolsets for accessing financial data
- Executes Python code in a sandbox for data analysis
- Streams events during execution via Temporal signals and queries
- Provides durable execution with automatic retries

## Architecture

The streaming architecture works as follows:

1. **Agent Configuration** (`agents.py`): Defines the agent with MCP toolsets and custom Python execution tools
2. **Workflow** (`workflow.py`): Temporal workflow that orchestrates agent execution and manages event streams
3. **Streaming Handler** (`streaming_handler.py`): Processes agent events and sends them to the workflow via signals
4. **Main** (`main.py`): Sets up the Temporal client/worker and polls for events via queries

## Key Components

### Event Flow

```
Agent Execution (in Activity)
Streaming Handler
↓ (via Signal)
Workflow Event Queue
↓ (via Query)
Main Process (polling)
Display to User
```

### Dependencies

The [`AgentDependencies`][pydantic_ai_examples.temporal_streaming.datamodels.AgentDependencies] model passes workflow identification from the workflow to activities, enabling the streaming handler to send signals back to the correct workflow instance.

## Prerequisites

1. **Temporal Server**: Install and run Temporal locally

```bash
brew install temporal
temporal server start-dev
```

2. **Python Dependencies**: Install required packages

```bash
pip install pydantic-ai temporalio mcp-run-python pyyaml
```

3. **Configuration File**: Create an `app_conf.yml` file in your project root

```yaml
llm:
anthropic_api_key: ANTHROPIC_API_KEY # Will be read from environment variable
```

4. **Environment Variables**: Set your Anthropic API key

```bash
export ANTHROPIC_API_KEY='your-api-key-here'
```

## Running the Example

1. Make sure Temporal server is running:

```bash
temporal server start-dev
```

2. Set the configuration file path (optional, defaults to `./app_conf.yml`):

```bash
export APP_CONFIG_PATH=./app_conf.yml
```

3. Run the example:

```bash
python -m pydantic_ai_examples.temporal_streaming.main
```

## What to Expect

The example will:
1. Connect to Temporal server
2. Start a worker to handle workflows and activities
3. Execute the workflow with a sample financial query
4. Stream events as the agent:
- Calls tools (Yahoo Finance API, Python sandbox)
- Receives responses
- Generates the final result
5. Display all events in real-time
6. Show the final result

## Project Structure

```
temporal_streaming/
├── agents.py # Agent configuration with MCP toolsets
├── datamodels.py # Pydantic models for dependencies and events
├── main.py # Main entry point
├── streaming_handler.py # Event stream handler
├── utils.py # Configuration utilities
├── workflow.py # Temporal workflow definition
└── README.md # This file
```

## Customization

### Changing the Query

Edit the query in `main.py`:

```python
workflow_handle = await client.start_workflow(
YahooFinanceSearchWorkflow.run,
args=['Your custom financial query here'],
id=workflow_id,
task_queue=task_queue,
)
```

### Adding More Tools

Add tools to the agent in `agents.py`:

```python
@agent.tool(name='your_tool_name')
async def your_tool(ctx: RunContext[AgentDependencies], param: str) -> str:
# Your tool implementation
return result
```

### Modifying Event Handling

Customize what events are captured and displayed in `streaming_handler.py`.

## Key Concepts

### Why Streaming is Different in Temporal

Traditional streaming methods like [`Agent.run_stream()`][pydantic_ai.Agent.run_stream] don't work in Temporal because:
- Activities cannot stream directly to the workflow
- The workflow and activity run in separate processes

### The Solution

This example uses:
- **Event Stream Handler**: Captures events during agent execution
- **Signals**: Push events from activities to the workflow
- **Queries**: Pull events from the workflow to the caller
- **Dependencies**: Pass workflow identification to enable signal routing

## Limitations

- Events are batched per model request/tool call rather than streamed token-by-token
- Query polling introduces a small delay in event delivery
- The workflow waits up to 10 seconds for events to be consumed before completing

## Learn More

- [Temporal Documentation](https://docs.temporal.io/)
- [Pydantic AI Temporal Integration](../../../docs/durable_execution/temporal.md)
- [Streaming with Pydantic AI](../../../docs/agents.md#streaming-all-events)
19 changes: 19 additions & 0 deletions examples/pydantic_ai_examples/temporal_streaming/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""Temporal streaming example for Pydantic AI.

This example demonstrates how to implement streaming with Pydantic AI agents
in Temporal workflows using signals and queries.
"""

from .agents import build_agent
from .datamodels import AgentDependencies, EventKind, EventStream
from .streaming_handler import streaming_handler
from .workflow import YahooFinanceSearchWorkflow

__all__ = [
'build_agent',
'streaming_handler',
'YahooFinanceSearchWorkflow',
'AgentDependencies',
'EventKind',
'EventStream',
]
107 changes: 107 additions & 0 deletions examples/pydantic_ai_examples/temporal_streaming/agents.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""Agent configuration for the Temporal streaming example.

This module defines the agent setup with MCP toolsets, model configuration,
and custom tools for data analysis.
"""
from datetime import timedelta
from typing import Any

from temporalio.common import RetryPolicy
from temporalio.workflow import ActivityConfig

from pydantic_ai import Agent, FilteredToolset, ModelSettings
from pydantic_ai.agent import EventStreamHandler
from pydantic_ai.durable_exec.temporal import TemporalAgent
from pydantic_ai.mcp import MCPServerStdio
from pydantic_ai.models.anthropic import AnthropicModel
from pydantic_ai.providers.anthropic import AnthropicProvider
from .datamodels import AgentDependencies


async def get_mcp_toolsets() -> dict[str, FilteredToolset[AgentDependencies]]:
"""
Initialize MCP toolsets for the agent.

Returns:
A dictionary mapping toolset names to filtered toolsets.
"""
yf_server = MCPServerStdio(
command='uvx',
args=['mcp-yahoo-finance'],
timeout=240,
read_timeout=240,
id='yahoo',
)
return {'yahoo': yf_server.filtered(lambda ctx, tool_def: True)}


async def get_claude_model(parallel_tool_calls: bool = True, **kwargs: Any) -> AnthropicModel:
"""
Create and configure the Claude model.

Args:
parallel_tool_calls: Whether to enable parallel tool calls.
**kwargs: Environment variables including API keys.

Returns:
Configured AnthropicModel instance.
"""
model_name: str = 'claude-sonnet-4-5-20250929'
api_key: str | None = kwargs.get('anthropic_api_key', None)
model: AnthropicModel = AnthropicModel(
model_name=model_name,
provider=AnthropicProvider(api_key=api_key),
settings=ModelSettings(
temperature=0.5,
max_tokens=64000,
parallel_tool_calls=parallel_tool_calls,
),
)

return model


async def build_agent(stream_handler: EventStreamHandler[AgentDependencies],
**kwargs: Any) -> TemporalAgent[AgentDependencies, str]:
"""
Build and configure the agent with tools and temporal settings.

Args:
stream_handler: Optional event stream handler for streaming responses.
**kwargs: Environment variables including API keys.

Returns:
TemporalAgent instance ready for use in Temporal workflows.
"""
system_prompt = """
You are an expert financial analyst that knows how to search for financial data on the web.
"""
agent_name = 'YahooFinanceSearchAgent'

toolsets = await get_mcp_toolsets()
agent: Agent[AgentDependencies, str] = Agent[AgentDependencies, str](
name=agent_name,
model=await get_claude_model(**kwargs),
toolsets=[*toolsets.values()],
system_prompt=system_prompt,
event_stream_handler=stream_handler,
deps_type=AgentDependencies,
)

temporal_agent = TemporalAgent(
wrapped=agent,
model_activity_config=ActivityConfig(
start_to_close_timeout=timedelta(minutes=5),
retry_policy=RetryPolicy(maximum_attempts=50),
),
toolset_activity_config={
toolset_id: ActivityConfig(
start_to_close_timeout=timedelta(minutes=3),
retry_policy=RetryPolicy(
maximum_attempts=3, non_retryable_error_types=['ToolRetryError']
),
)
for toolset_id in toolsets.keys()
},
)
return temporal_agent
11 changes: 11 additions & 0 deletions examples/pydantic_ai_examples/temporal_streaming/app_conf.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Configuration file for the Temporal Streaming example
#
# This file demonstrates how to configure API keys using environment variables.
# The value specified here (e.g., ANTHROPIC_API_KEY) will be replaced with the
# actual environment variable value at runtime.

llm:
# The anthropic_api_key will be read from the ANTHROPIC_API_KEY environment variable
# Make sure to set it before running the example:
# export ANTHROPIC_API_KEY='your-api-key-here'
anthropic_api_key: 'ANTHROPIC_API_KEY'
27 changes: 27 additions & 0 deletions examples/pydantic_ai_examples/temporal_streaming/datamodels.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Data models for the temporal streaming example."""

from enum import Enum

from pydantic import BaseModel


class AgentDependencies(BaseModel):
"""Dependencies passed to the agent containing workflow identification."""

workflow_id: str
run_id: str


class EventKind(str, Enum):
"""Types of events that can be streamed."""

CONTINUE_CHAT = 'continue_chat'
EVENT = 'event'
RESULT = 'result'


class EventStream(BaseModel):
"""Event stream data model for streaming agent events."""

kind: EventKind
content: str
Loading
Loading