diff --git a/python/packages/azurefunctions/tests/integration_tests/test_03_callbacks.py b/python/packages/azurefunctions/tests/integration_tests/test_03_callbacks.py deleted file mode 100644 index 06414f993a..0000000000 --- a/python/packages/azurefunctions/tests/integration_tests/test_03_callbacks.py +++ /dev/null @@ -1,102 +0,0 @@ -# Copyright (c) Microsoft. All rights reserved. -""" -Integration Tests for Callbacks Sample - -Tests the callbacks sample for event tracking and management. - -The function app is automatically started by the test fixture. - -Prerequisites: -- Azure OpenAI credentials configured (see packages/azurefunctions/tests/integration_tests/.env.example) -- Azurite or Azure Storage account configured - -Usage: - uv run pytest packages/azurefunctions/tests/integration_tests/test_03_callbacks.py -v -""" - -from typing import Any - -import pytest -import requests - -from .testutils import ( - TIMEOUT, - SampleTestHelper, - skip_if_azure_functions_integration_tests_disabled, -) - -# Module-level markers - applied to all tests in this file -pytestmark = [ - pytest.mark.sample("03_callbacks"), - pytest.mark.usefixtures("function_app_for_test"), - skip_if_azure_functions_integration_tests_disabled, -] - - -class TestSampleCallbacks: - """Tests for 03_callbacks sample.""" - - @pytest.fixture(autouse=True) - def _set_base_url(self, base_url: str) -> None: - """Provide the callback agent base URL for each test.""" - self.base_url = f"{base_url}/api/agents/CallbackAgent" - - @staticmethod - def _wait_for_callback_events(base_url: str, thread_id: str) -> list[dict[str, Any]]: - events: list[dict[str, Any]] = [] - response = SampleTestHelper.get(f"{base_url}/callbacks/{thread_id}") - if response.status_code == 200: - events = response.json() - return events - - def test_agent_with_callbacks(self) -> None: - """Test agent execution with callback tracking.""" - thread_id = "test-callback" - - response = SampleTestHelper.post_json( - f"{self.base_url}/run", - {"message": "Tell me about Python", "thread_id": thread_id}, - ) - assert response.status_code == 200 - data = response.json() - - assert data["status"] == "success" - - events = self._wait_for_callback_events(self.base_url, thread_id) - - assert events - assert any(event.get("event_type") == "final" for event in events) - - def test_get_callbacks(self) -> None: - """Test retrieving callback events.""" - thread_id = "test-callback-retrieve" - - # Send a message first - SampleTestHelper.post_json( - f"{self.base_url}/run", - {"message": "Hello", "thread_id": thread_id, "wait_for_response": False}, - ) - - # Get callbacks - response = SampleTestHelper.get(f"{self.base_url}/callbacks/{thread_id}") - assert response.status_code == 200 - data = response.json() - assert isinstance(data, list) - - def test_delete_callbacks(self) -> None: - """Test clearing callback events.""" - thread_id = "test-callback-delete" - - # Send a message first - SampleTestHelper.post_json( - f"{self.base_url}/run", - {"message": "Test", "thread_id": thread_id, "wait_for_response": False}, - ) - - # Delete callbacks - response = requests.delete(f"{self.base_url}/callbacks/{thread_id}", timeout=TIMEOUT) - assert response.status_code == 204 - - -if __name__ == "__main__": - pytest.main([__file__, "-v"]) diff --git a/python/packages/azurefunctions/tests/integration_tests/test_03_reliable_streaming.py b/python/packages/azurefunctions/tests/integration_tests/test_03_reliable_streaming.py new file mode 100644 index 0000000000..44fb8efb2f --- /dev/null +++ b/python/packages/azurefunctions/tests/integration_tests/test_03_reliable_streaming.py @@ -0,0 +1,125 @@ +# Copyright (c) Microsoft. All rights reserved. +""" +Integration Tests for Reliable Streaming Sample + +Tests the reliable streaming sample using Redis Streams for persistent message delivery. + +The function app is automatically started by the test fixture. + +Prerequisites: +- Azure OpenAI credentials configured (see packages/azurefunctions/tests/integration_tests/.env.example) +- Azurite or Azure Storage account configured +- Redis running (docker run -d --name redis -p 6379:6379 redis:latest) + +Usage: + uv run pytest packages/azurefunctions/tests/integration_tests/test_03_reliable_streaming.py -v +""" + +import time + +import pytest +import requests + +from .testutils import ( + SampleTestHelper, + skip_if_azure_functions_integration_tests_disabled, +) + +# Module-level markers - applied to all tests in this file +pytestmark = [ + pytest.mark.sample("03_reliable_streaming"), + pytest.mark.usefixtures("function_app_for_test"), + skip_if_azure_functions_integration_tests_disabled, +] + + +class TestSampleReliableStreaming: + """Tests for 03_reliable_streaming sample.""" + + @pytest.fixture(autouse=True) + def _set_base_url(self, base_url: str) -> None: + """Provide the base URL for each test.""" + self.base_url = base_url + self.agent_url = f"{base_url}/api/agents/TravelPlanner" + self.stream_url = f"{base_url}/api/agent/stream" + + def test_agent_run_and_stream(self) -> None: + """Test agent execution with Redis streaming.""" + # Start agent run + response = SampleTestHelper.post_json( + f"{self.agent_url}/run", + {"message": "Plan a 1-day trip to Seattle in 1 sentence", "wait_for_response": False}, + ) + assert response.status_code == 202 + data = response.json() + + thread_id = data.get("thread_id") + + # Wait a moment for the agent to start writing to Redis + time.sleep(2) + + # Stream response from Redis with shorter timeout + # Note: We use text/plain to avoid SSE parsing complexity + stream_response = requests.get( + f"{self.stream_url}/{thread_id}", + headers={"Accept": "text/plain"}, + timeout=30, # Shorter timeout for test + ) + assert stream_response.status_code == 200 + + def test_stream_with_sse_format(self) -> None: + """Test streaming with Server-Sent Events format.""" + # Start agent run + response = SampleTestHelper.post_json( + f"{self.agent_url}/run", + {"message": "What's the weather like?", "wait_for_response": False}, + ) + assert response.status_code == 202 + data = response.json() + thread_id = data.get("thread_id") + + # Wait for agent to start writing + time.sleep(2) + + # Stream with SSE format + stream_response = requests.get( + f"{self.stream_url}/{thread_id}", + headers={"Accept": "text/event-stream"}, + timeout=30, # Shorter timeout + ) + assert stream_response.status_code == 200 + content_type = stream_response.headers.get("content-type", "") + assert "text/event-stream" in content_type + + # Check for SSE event markers if we got content + content = stream_response.text + if content: + assert "event:" in content or "data:" in content + + def test_stream_nonexistent_conversation(self) -> None: + """Test streaming from a non-existent conversation. + + The endpoint will wait for data in Redis, but since the conversation + doesn't exist, it will timeout. This is expected behavior. + """ + fake_id = "nonexistent-conversation-12345" + + # Should timeout since the conversation doesn't exist + with pytest.raises(requests.exceptions.ReadTimeout): + requests.get( + f"{self.stream_url}/{fake_id}", + headers={"Accept": "text/plain"}, + timeout=10, # Short timeout for non-existent ID + ) + + def test_health_endpoint(self) -> None: + """Test health check endpoint.""" + response = SampleTestHelper.get(f"{self.base_url}/api/health") + assert response.status_code == 200 + data = response.json() + assert data["status"] == "healthy" + assert "agents" in data + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/python/samples/README.md b/python/samples/README.md index 7d291f119e..20798e4ad0 100644 --- a/python/samples/README.md +++ b/python/samples/README.md @@ -236,7 +236,7 @@ The recommended way to use Ollama is via the native `OllamaChatClient` from the |--------|-------------| | [`getting_started/azure_functions/01_single_agent/`](./getting_started/azure_functions/01_single_agent/) | Host a single agent in Azure Functions with Durable Extension HTTP endpoints and per-session state. | | [`getting_started/azure_functions/02_multi_agent/`](./getting_started/azure_functions/02_multi_agent/) | Register multiple agents in one function app with dedicated run routes and a health check endpoint. | -| [`getting_started/azure_functions/03_callbacks/`](./getting_started/azure_functions/03_callbacks/) | Capture streaming response telemetry via Durable Extension callbacks exposed through HTTP APIs. | +| [`getting_started/azure_functions/03_reliable_streaming/`](./getting_started/azure_functions/03_reliable_streaming/) | Implement reliable streaming for durable agents using Redis Streams with cursor-based resumption. | | [`getting_started/azure_functions/04_single_agent_orchestration_chaining/`](./getting_started/azure_functions/04_single_agent_orchestration_chaining/) | Chain sequential agent executions inside a durable orchestration while preserving the shared thread context. | | [`getting_started/azure_functions/05_multi_agent_orchestration_concurrency/`](./getting_started/azure_functions/05_multi_agent_orchestration_concurrency/) | Run two agents concurrently within a durable orchestration and combine their domain-specific outputs. | | [`getting_started/azure_functions/06_multi_agent_orchestration_conditionals/`](./getting_started/azure_functions/06_multi_agent_orchestration_conditionals/) | Route orchestration logic based on structured agent responses for spam detection and reply drafting. | diff --git a/python/samples/getting_started/azure_functions/03_callbacks/README.md b/python/samples/getting_started/azure_functions/03_callbacks/README.md deleted file mode 100644 index 09e50bcfd1..0000000000 --- a/python/samples/getting_started/azure_functions/03_callbacks/README.md +++ /dev/null @@ -1,83 +0,0 @@ -# Callback Telemetry Sample - -This sample demonstrates how to use the Durable Extension for Agent Framework's response callbacks to observe -streaming updates and final agent responses in real time. The `ConversationAuditTrail` callback -records each chunk received from the Azure OpenAI agent and exposes the collected events through -an HTTP API that can be polled by a web client or dashboard. - -## Highlights - -- Registers a default `AgentResponseCallbackProtocol` implementation that logs streaming and final - responses. -- Persists callback events in an in-memory store and exposes them via - `GET /api/agents/{agentName}/callbacks/{thread_id}`. -- Shows how to reset stored callback events with `DELETE /api/agents/{agentName}/callbacks/{thread_id}`. -- Works alongside the standard `/api/agents/{agentName}/run` endpoint so you can correlate callback - telemetry with agent responses. - -## Prerequisites - -Complete the shared environment setup steps in `../README.md`, including creating a virtual environment, installing dependencies, and configuring Azure OpenAI credentials and storage settings. - -> **Note:** This is a streaming example that currently uses a local in-memory store for simplicity. -> For distributed environments, consider using Redis, Service Bus, or another pub/sub mechanism for -> callback coordination. - -## Running the Sample - -Send a prompt to the agent: - -```bash -curl -X POST http://localhost:7071/api/agents/CallbackAgent/run \ - -H "Content-Type: application/json" \ - -d '{"message": "Tell me a short joke"}' -``` - -> **Note:** The run endpoint waits for the agent response by default. To return immediately, set the `x-ms-wait-for-response` header or include `"wait_for_response": false` in the request body. - -Poll callback telemetry (replace `` with the value from the POST response): - -```bash -curl http://localhost:7071/api/agents/CallbackAgent/callbacks/ -``` - -Reset stored events: - -```bash -curl -X DELETE http://localhost:7071/api/agents/CallbackAgent/callbacks/ -``` - -## Expected Output - -When you call `GET /api/agents/CallbackAgent/callbacks/{thread_id}` after sending a request to the agent, -the API returns a list of streaming and final callback events similar to the following: - -```json -[ - { - "timestamp": "2024-01-01T00:00:00Z", - "agent_name": "CallbackAgent", - "thread_id": "", - "correlation_id": "", - "request_message": "Tell me a short joke", - "event_type": "stream", - "update_kind": "text", - "text": "Sure, here's a joke..." - }, - { - "timestamp": "2024-01-01T00:00:01Z", - "agent_name": "CallbackAgent", - "thread_id": "", - "correlation_id": "", - "request_message": "Tell me a short joke", - "event_type": "final", - "response_text": "Why did the cloud...", - "usage": { - "type": "usage_details", - "input_token_count": 159, - "output_token_count": 29, - "total_token_count": 188 - } - } -] -``` diff --git a/python/samples/getting_started/azure_functions/03_callbacks/demo.http b/python/samples/getting_started/azure_functions/03_callbacks/demo.http deleted file mode 100644 index 771ed38027..0000000000 --- a/python/samples/getting_started/azure_functions/03_callbacks/demo.http +++ /dev/null @@ -1,30 +0,0 @@ -### Callback Sample - API Tests -### Use with VS Code REST Client or another HTTP testing tool. -### -### Endpoints introduced in this sample: -### - POST /api/agents/{agentName}/run : send a message to the agent -### - GET /api/agents/{agentName}/callbacks/{thread_id} : retrieve callback telemetry -### - DELETE /api/agents/{agentName}/callbacks/{thread_id} : clear stored callback events - -@baseUrl = http://localhost:7071 -@agentName = CallbackAgent -@agentRoute = {{baseUrl}}/api/agents/{{agentName}} -@thread_id = test-thread-00 - -### Health Check -GET {{baseUrl}}/api/health - -### Send message (callbacks will capture streaming + final response) -POST {{agentRoute}}/run -Content-Type: application/json - -{ - "message": "Generate a short weather update for Paris and mention streaming callbacks.", - "thread_id": "{{thread_id}}" -} - -### Inspect callback telemetry -GET {{agentRoute}}/callbacks/{{thread_id}} - -### Clear stored callback telemetry for the thread -DELETE {{agentRoute}}/callbacks/{{thread_id}} diff --git a/python/samples/getting_started/azure_functions/03_callbacks/function_app.py b/python/samples/getting_started/azure_functions/03_callbacks/function_app.py deleted file mode 100644 index e6702f6586..0000000000 --- a/python/samples/getting_started/azure_functions/03_callbacks/function_app.py +++ /dev/null @@ -1,185 +0,0 @@ -"""Capture agent response callbacks inside Azure Functions. - -Components used in this sample: -- AzureOpenAIChatClient to build an agent that streams interim updates. -- AgentFunctionApp with a default AgentResponseCallbackProtocol implementation. -- Azure Functions HTTP triggers that expose callback telemetry via REST. - -Prerequisites: set `AZURE_OPENAI_ENDPOINT`, `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME`, and either -`AZURE_OPENAI_API_KEY` or authenticate with Azure CLI before starting the Functions host.""" - -import json -import logging -from collections import defaultdict -from datetime import datetime, timezone -from typing import Any, DefaultDict - -import azure.functions as func -from agent_framework import AgentRunResponseUpdate -from agent_framework.azure import ( - AgentCallbackContext, - AgentFunctionApp, - AgentResponseCallbackProtocol, - AzureOpenAIChatClient, -) -from azure.identity import AzureCliCredential - -logger = logging.getLogger(__name__) - - -# 1. Maintain an in-memory store for callback events keyed by thread ID. -# NOTE: This is a streaming example using a local console logger. For distributed environments, -# consider using Redis or Service Bus for callback coordination across multiple instances. -CallbackStore = DefaultDict[str, list[dict[str, Any]]] -callback_events: CallbackStore = defaultdict(list) - - -def _serialize_usage(usage: Any) -> Any: - """Best-effort serialization for agent usage metadata.""" - - if usage is None: - return None - - model_dump = getattr(usage, "model_dump", None) - if callable(model_dump): - return model_dump() - - to_dict = getattr(usage, "to_dict", None) - if callable(to_dict): - return to_dict() - - return str(usage) - - -class ConversationAuditTrail(AgentResponseCallbackProtocol): - """Callback that records streaming chunks and final responses for later inspection.""" - - def __init__(self) -> None: - self._logger = logging.getLogger("durableagent.samples.callbacks.audit") - - async def on_streaming_response_update( - self, - update: AgentRunResponseUpdate, - context: AgentCallbackContext, - ) -> None: - event = self._build_base_event(context) - event.update( - { - "event_type": "stream", - "update_kind": getattr(update, "kind", "text"), - "text": getattr(update, "text", None), - } - ) - thread_id = context.thread_id or "" - callback_events[thread_id].append(event) - - preview = event.get("text") or event.get("update_kind") - self._logger.info( - "[%s][%s] streaming chunk: %s", - context.agent_name, - context.correlation_id, - preview, - ) - - async def on_agent_response(self, response, context: AgentCallbackContext) -> None: - event = self._build_base_event(context) - event.update( - { - "event_type": "final", - "response_text": getattr(response, "text", None), - "usage": _serialize_usage(getattr(response, "usage_details", None)), - } - ) - thread_id = context.thread_id or "" - callback_events[thread_id].append(event) - - self._logger.info( - "[%s][%s] final response recorded", - context.agent_name, - context.correlation_id, - ) - - @staticmethod - def _build_base_event(context: AgentCallbackContext) -> dict[str, Any]: - thread_id = context.thread_id - return { - "timestamp": datetime.now(timezone.utc).isoformat(), - "agent_name": context.agent_name, - "thread_id": thread_id, - "correlation_id": context.correlation_id, - "request_message": context.request_message, - } - - -# 2. Create the agent that will emit streaming updates and final responses. -callback_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( - name="CallbackAgent", - instructions=( - "You are a friendly assistant that narrates actions while responding. " - "Keep answers concise and acknowledge when callbacks capture streaming updates." - ), -) - - -# 3. Register the agent inside AgentFunctionApp with a default callback instance. -audit_callback = ConversationAuditTrail() -app = AgentFunctionApp(enable_health_check=True, default_callback=audit_callback) -app.add_agent(callback_agent) - - -@app.function_name("get_callback_events") -@app.route(route="agents/{agent_name}/callbacks/{thread_id}", methods=["GET"]) -async def get_callback_events(req: func.HttpRequest) -> func.HttpResponse: - """Return all callback events collected for a thread.""" - - thread_id = req.route_params.get("thread_id", "") - events = callback_events.get(thread_id, []) - return func.HttpResponse( - json.dumps(events, indent=2), - status_code=200, - mimetype="application/json", - ) - - -@app.function_name("reset_callback_events") -@app.route(route="agents/{agent_name}/callbacks/{thread_id}", methods=["DELETE"]) -async def reset_callback_events(req: func.HttpRequest) -> func.HttpResponse: - """Clear the stored callback events for a thread.""" - - thread_id = req.route_params.get("thread_id", "") - callback_events.pop(thread_id, None) - return func.HttpResponse(status_code=204) - - -""" -Expected output when querying `GET /api/agents/CallbackAgent/callbacks/{thread_id}`: - -HTTP/1.1 200 OK -[ - { - "timestamp": "2024-01-01T00:00:00Z", - "agent_name": "CallbackAgent", - "thread_id": "", - "correlation_id": "", - "request_message": "Tell me a short joke", - "event_type": "stream", - "update_kind": "text", - "text": "Sure, here's a joke..." - }, - { - "timestamp": "2024-01-01T00:00:01Z", - "agent_name": "CallbackAgent", - "thread_id": "", - "correlation_id": "", - "request_message": "Tell me a short joke", - "event_type": "final", - "response_text": "Why did the cloud...", - "usage": { - "type": "usage_details", - "input_token_count": 159, - "output_token_count": 29, - "total_token_count": 188 - } - } -] -""" diff --git a/python/samples/getting_started/azure_functions/03_reliable_streaming/README.md b/python/samples/getting_started/azure_functions/03_reliable_streaming/README.md new file mode 100644 index 0000000000..181a338962 --- /dev/null +++ b/python/samples/getting_started/azure_functions/03_reliable_streaming/README.md @@ -0,0 +1,132 @@ +# Agent Response Callbacks with Redis Streaming + +This sample demonstrates how to use Redis Streams with agent response callbacks to enable reliable, resumable streaming for durable agents. Clients can disconnect and reconnect without losing messages by using cursor-based pagination. + +## Key Concepts Demonstrated + +- Using `AgentResponseCallbackProtocol` to capture streaming agent responses +- Persisting streaming chunks to Redis Streams for reliable delivery +- Building a custom HTTP endpoint to read from Redis with Server-Sent Events (SSE) format +- Supporting cursor-based resumption for disconnected clients +- Managing Redis client lifecycle with async context managers + +## Prerequisites + +In addition to the common setup steps in `../README.md`, this sample requires Redis: + +```bash +# Start Redis +docker run -d --name redis -p 6379:6379 redis:latest +``` + +Update `local.settings.json` with your Redis connection string: + +```json +{ + "Values": { + "REDIS_CONNECTION_STRING": "redis://localhost:6379" + } +} +``` + +## Running the Sample + +### Start the agent run + +The agent executes in the background via durable orchestration. The `RedisStreamCallback` persists streaming chunks to Redis: + +```bash +curl -X POST http://localhost:7071/api/agents/TravelPlanner/run \ + -H "Content-Type: text/plain" \ + -d "Plan a 3-day trip to Tokyo" +``` + +Response (202 Accepted): +```json +{ + "status": "accepted", + "response": "Agent request accepted", + "conversation_id": "abc-123-def-456", + "correlation_id": "xyz-789" +} +``` + +### Stream the response from Redis + +Use the custom `/api/agent/stream/{conversation_id}` endpoint to read persisted chunks: + +```bash +curl http://localhost:7071/api/agent/stream/abc-123-def-456 \ + -H "Accept: text/event-stream" +``` + +Response (SSE format): +``` +id: 1734649123456-0 +event: message +data: Here's a wonderful 3-day Tokyo itinerary... + +id: 1734649123789-0 +event: message +data: Day 1: Arrival and Shibuya... + +id: 1734649124012-0 +event: done +data: [DONE] +``` + +### Resume from a cursor + +Use a cursor ID from an SSE event to skip already-processed messages: + +```bash +curl "http://localhost:7071/api/agent/stream/abc-123-def-456?cursor=1734649123456-0" \ + -H "Accept: text/event-stream" +``` + +## How It Works + +### 1. Redis Callback + +The `RedisStreamCallback` class implements `AgentResponseCallbackProtocol` to capture streaming updates: + +```python +class RedisStreamCallback(AgentResponseCallbackProtocol): + async def on_streaming_response_update(self, update, context): + # Write chunk to Redis Stream + async with await get_stream_handler() as handler: + await handler.write_chunk(thread_id, update.text, sequence) + + async def on_agent_response(self, response, context): + # Write end-of-stream marker + async with await get_stream_handler() as handler: + await handler.write_completion(thread_id, sequence) +``` + +### 2. Custom Streaming Endpoint + +The `/api/agent/stream/{conversation_id}` endpoint reads from Redis: + +```python +@app.route(route="agent/stream/{conversation_id}", methods=["GET"]) +async def stream(req): + conversation_id = req.route_params.get("conversation_id") + cursor = req.params.get("cursor") # Optional + + async with await get_stream_handler() as handler: + async for chunk in handler.read_stream(conversation_id, cursor): + # Format and return chunks +``` + +### 3. Redis Streams + +Messages are stored in Redis Streams with automatic TTL (default: 10 minutes): + +``` +Stream Key: agent-stream:{conversation_id} +Entry: { + "text": "chunk content", + "sequence": "0", + "timestamp": "1734649123456" +} +``` \ No newline at end of file diff --git a/python/samples/getting_started/azure_functions/03_reliable_streaming/demo.http b/python/samples/getting_started/azure_functions/03_reliable_streaming/demo.http new file mode 100644 index 0000000000..6cdc1d10c3 --- /dev/null +++ b/python/samples/getting_started/azure_functions/03_reliable_streaming/demo.http @@ -0,0 +1,55 @@ +### Reliable Streaming with Redis - Demo HTTP Requests +### Use with the VS Code REST Client extension or any HTTP client +### +### Workflow: +### 1. POST /api/agents/{agentName}/run -> Start durable agent (returns conversation_id) +### 2. GET /api/agent/stream/{id} -> Read chunks from Redis (SSE or plain text) +### 3. Add ?cursor={id} to resume from a specific point +### +### Prerequisites: +### - Redis: docker run -d --name redis -p 6379:6379 redis:latest +### - Start function app: func start + +### Variables +@baseUrl = http://localhost:7071 +@agentName = TravelPlanner + +### Health Check +GET {{baseUrl}}/api/health + +### + +### Start Agent Run +# Starts the agent in the background via durable orchestration. +# The RedisStreamCallback persists streaming chunks to Redis. +# @name trip +POST {{baseUrl}}/api/agents/{{agentName}}/run +Content-Type: text/plain + +Plan a 3-day trip to Tokyo + +### + +### Stream from Redis (SSE format) +# Reads persisted chunks from Redis using cursor-based pagination. +# The conversation_id is automatically captured from the previous request. +@conversationId = {{trip.response.body.$.conversation_id}} +GET {{baseUrl}}/api/agent/stream/{{conversationId}} +Accept: text/event-stream + +### + +### Stream from Redis (plain text) +# Same as above, but returns plain text instead of SSE format +GET {{baseUrl}}/api/agent/stream/{{conversationId}} +Accept: text/plain + +### + +### Resume from cursor +# Use a cursor ID from an SSE event to skip already-processed messages +# Replace {cursor_id} with an actual entry ID from the SSE stream +GET {{baseUrl}}/api/agent/stream/{{conversationId}}?cursor={cursor_id} +Accept: text/event-stream + +### diff --git a/python/samples/getting_started/azure_functions/03_reliable_streaming/function_app.py b/python/samples/getting_started/azure_functions/03_reliable_streaming/function_app.py new file mode 100644 index 0000000000..31db10a9df --- /dev/null +++ b/python/samples/getting_started/azure_functions/03_reliable_streaming/function_app.py @@ -0,0 +1,322 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Reliable streaming for durable agents using Redis Streams. + +This sample demonstrates how to implement reliable streaming for durable agents using Redis Streams. + +Components used in this sample: +- AzureOpenAIChatClient to create the travel planner agent with tools. +- AgentFunctionApp with a Redis-based callback for persistent streaming. +- Custom HTTP endpoint to resume streaming from any point using cursor-based pagination. + +Prerequisites: +- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME +- Redis running (docker run -d --name redis -p 6379:6379 redis:latest) +- DTS and Azurite running (see parent README) +""" + +import logging +import os +from datetime import timedelta + +import redis.asyncio as aioredis +from agent_framework import AgentRunResponseUpdate +import azure.functions as func +from agent_framework.azure import ( + AgentCallbackContext, + AgentFunctionApp, + AgentResponseCallbackProtocol, + AzureOpenAIChatClient, +) +from azure.identity import AzureCliCredential + +from redis_stream_response_handler import RedisStreamResponseHandler, StreamChunk +from tools import get_local_events, get_weather_forecast + +logger = logging.getLogger(__name__) + +# Configuration +REDIS_CONNECTION_STRING = os.environ.get("REDIS_CONNECTION_STRING", "redis://localhost:6379") +REDIS_STREAM_TTL_MINUTES = int(os.environ.get("REDIS_STREAM_TTL_MINUTES", "10")) + +async def get_stream_handler() -> RedisStreamResponseHandler: + """Create a new Redis stream handler for each request. + + This avoids event loop conflicts in Azure Functions by creating + a fresh Redis client in the current event loop context. + """ + # Create a new Redis client in the current event loop + redis_client = aioredis.from_url( + REDIS_CONNECTION_STRING, + encoding="utf-8", + decode_responses=False, + ) + + return RedisStreamResponseHandler( + redis_client=redis_client, + stream_ttl=timedelta(minutes=REDIS_STREAM_TTL_MINUTES), + ) + + +class RedisStreamCallback(AgentResponseCallbackProtocol): + """Callback that writes streaming updates to Redis Streams for reliable delivery. + + This enables clients to disconnect and reconnect without losing messages. + """ + + def __init__(self) -> None: + self._logger = logging.getLogger("durableagent.samples.redis_streaming") + self._sequence_numbers = {} # Track sequence per thread + + async def on_streaming_response_update( + self, + update: AgentRunResponseUpdate, + context: AgentCallbackContext, + ) -> None: + """Write streaming update to Redis Stream. + + Args: + update: The streaming response update chunk. + context: The callback context with thread_id, agent_name, etc. + """ + thread_id = context.thread_id + if not thread_id: + self._logger.warning("No thread_id available for streaming update") + return + + if not update.text: + return + + text = update.text + + # Get or initialize sequence number for this thread + if thread_id not in self._sequence_numbers: + self._sequence_numbers[thread_id] = 0 + + sequence = self._sequence_numbers[thread_id] + + try: + # Use context manager to ensure Redis client is properly closed + async with await get_stream_handler() as stream_handler: + # Write chunk to Redis Stream using public API + await stream_handler.write_chunk(thread_id, text, sequence) + + self._sequence_numbers[thread_id] += 1 + + self._logger.info( + "[%s][%s] Wrote chunk to Redis: seq=%d, text=%s", + context.agent_name, + thread_id[:8], + sequence, + text, + ) + except Exception as ex: + self._logger.error(f"Error writing to Redis stream: {ex}", exc_info=True) + + async def on_agent_response(self, response, context: AgentCallbackContext) -> None: + """Write end-of-stream marker when agent completes. + + Args: + response: The final agent response. + context: The callback context. + """ + thread_id = context.thread_id + if not thread_id: + return + + sequence = self._sequence_numbers.get(thread_id, 0) + + try: + # Use context manager to ensure Redis client is properly closed + async with await get_stream_handler() as stream_handler: + # Write end-of-stream marker using public API + await stream_handler.write_completion(thread_id, sequence) + + self._logger.info( + "[%s][%s] Agent completed, wrote end-of-stream marker", + context.agent_name, + thread_id[:8], + ) + + # Clean up sequence tracker + self._sequence_numbers.pop(thread_id, None) + except Exception as ex: + self._logger.error(f"Error writing end-of-stream marker: {ex}", exc_info=True) + + +# Create the Redis streaming callback +redis_callback = RedisStreamCallback() + + +# Create the travel planner agent +def create_travel_agent(): + """Create the TravelPlanner agent with tools.""" + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( + name="TravelPlanner", + instructions="""You are an expert travel planner who creates detailed, personalized travel itineraries. +When asked to plan a trip, you should: +1. Create a comprehensive day-by-day itinerary +2. Include specific recommendations for activities, restaurants, and attractions +3. Provide practical tips for each destination +4. Consider weather and local events when making recommendations +5. Include estimated times and logistics between activities + +Always use the available tools to get current weather forecasts and local events +for the destination to make your recommendations more relevant and timely. + +Format your response with clear headings for each day and include emoji icons +to make the itinerary easy to scan and visually appealing.""", + tools=[get_weather_forecast, get_local_events], + ) + + +# Create AgentFunctionApp with the Redis callback +app = AgentFunctionApp( + agents=[create_travel_agent()], + enable_health_check=True, + default_callback=redis_callback, + max_poll_retries=100, # Increase for longer-running agents +) + + +# Custom streaming endpoint for reading from Redis +# Use the standard /api/agents/TravelPlanner/run endpoint to start agent runs + + +@app.function_name("stream") +@app.route(route="agent/stream/{conversation_id}", methods=["GET"]) +async def stream(req: func.HttpRequest) -> func.HttpResponse: + """Resume streaming from a specific cursor position for an existing session. + + This endpoint reads all currently available chunks from Redis for the given + conversation ID, starting from the specified cursor (or beginning if no cursor). + + Use this endpoint to resume a stream after disconnection. Pass the conversation ID + and optionally a cursor (Redis entry ID) to continue from where you left off. + + Query Parameters: + cursor (optional): Redis stream entry ID to resume from. If not provided, starts from beginning. + + Response Headers: + Content-Type: text/event-stream or text/plain based on Accept header + x-conversation-id: The conversation/thread ID + + SSE Event Fields (when Accept: text/event-stream): + id: Redis stream entry ID (use as cursor for resumption) + event: "message" for content, "done" for completion, "error" for errors + data: The text content or status message + """ + try: + conversation_id = req.route_params.get("conversation_id") + if not conversation_id: + return func.HttpResponse( + "Conversation ID is required.", + status_code=400, + ) + + # Get optional cursor from query string + cursor = req.params.get("cursor") + + logger.info( + f"Resuming stream for conversation {conversation_id} from cursor: {cursor or '(beginning)'}" + ) + + # Check Accept header to determine response format + accept_header = req.headers.get("Accept", "") + use_sse_format = "text/plain" not in accept_header.lower() + + # Stream chunks from Redis + return await _stream_to_client(conversation_id, cursor, use_sse_format) + + except Exception as ex: + logger.error(f"Error in stream endpoint: {ex}", exc_info=True) + return func.HttpResponse( + f"Internal server error: {str(ex)}", + status_code=500, + ) + + +async def _stream_to_client( + conversation_id: str, + cursor: str | None, + use_sse_format: bool, +) -> func.HttpResponse: + """Stream chunks from Redis to the HTTP response. + + Args: + conversation_id: The conversation ID to stream from. + cursor: Optional cursor to resume from. If None, streams from the beginning. + use_sse_format: True to use SSE format, false for plain text. + + Returns: + HTTP response with all currently available chunks. + """ + chunks = [] + + # Use context manager to ensure Redis client is properly closed + async with await get_stream_handler() as stream_handler: + try: + async for chunk in stream_handler.read_stream(conversation_id, cursor): + if chunk.error: + logger.warning(f"Stream error for {conversation_id}: {chunk.error}") + chunks.append(_format_error(chunk.error, use_sse_format)) + break + + if chunk.is_done: + chunks.append(_format_end_of_stream(chunk.entry_id, use_sse_format)) + break + + if chunk.text: + chunks.append(_format_chunk(chunk, use_sse_format)) + + except Exception as ex: + logger.error(f"Error reading from Redis: {ex}", exc_info=True) + chunks.append(_format_error(str(ex), use_sse_format)) + + # Return all chunks + response_body = "".join(chunks) + + return func.HttpResponse( + body=response_body, + mimetype="text/event-stream" if use_sse_format else "text/plain; charset=utf-8", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "x-conversation-id": conversation_id, + }, + ) + + +def _format_chunk(chunk: StreamChunk, use_sse_format: bool) -> str: + """Format a text chunk.""" + if use_sse_format: + return _format_sse_event("message", chunk.text, chunk.entry_id) + else: + return chunk.text + + +def _format_end_of_stream(entry_id: str, use_sse_format: bool) -> str: + """Format end-of-stream marker.""" + if use_sse_format: + return _format_sse_event("done", "[DONE]", entry_id) + else: + return "\n" + + +def _format_error(error: str, use_sse_format: bool) -> str: + """Format error message.""" + if use_sse_format: + return _format_sse_event("error", error, None) + else: + return f"\n[Error: {error}]\n" + + +def _format_sse_event(event_type: str, data: str, event_id: str | None = None) -> str: + """Format a Server-Sent Event.""" + lines = [] + if event_id: + lines.append(f"id: {event_id}") + lines.append(f"event: {event_type}") + lines.append(f"data: {data}") + lines.append("") + return "\n".join(lines) + "\n" diff --git a/python/samples/getting_started/azure_functions/03_callbacks/host.json b/python/samples/getting_started/azure_functions/03_reliable_streaming/host.json similarity index 100% rename from python/samples/getting_started/azure_functions/03_callbacks/host.json rename to python/samples/getting_started/azure_functions/03_reliable_streaming/host.json diff --git a/python/samples/getting_started/azure_functions/03_callbacks/local.settings.json.template b/python/samples/getting_started/azure_functions/03_reliable_streaming/local.settings.json.template similarity index 74% rename from python/samples/getting_started/azure_functions/03_callbacks/local.settings.json.template rename to python/samples/getting_started/azure_functions/03_reliable_streaming/local.settings.json.template index 7d6ef15f82..b87786468e 100644 --- a/python/samples/getting_started/azure_functions/03_callbacks/local.settings.json.template +++ b/python/samples/getting_started/azure_functions/03_reliable_streaming/local.settings.json.template @@ -7,6 +7,8 @@ "TASKHUB_NAME": "default", "AZURE_OPENAI_ENDPOINT": "", "AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "", - "AZURE_OPENAI_API_KEY": "" + "AZURE_OPENAI_API_KEY": "", + "REDIS_CONNECTION_STRING": "redis://localhost:6379", + "REDIS_STREAM_TTL_MINUTES": "10" } } diff --git a/python/samples/getting_started/azure_functions/03_reliable_streaming/redis_stream_response_handler.py b/python/samples/getting_started/azure_functions/03_reliable_streaming/redis_stream_response_handler.py new file mode 100644 index 0000000000..e6d60735bf --- /dev/null +++ b/python/samples/getting_started/azure_functions/03_reliable_streaming/redis_stream_response_handler.py @@ -0,0 +1,200 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Redis-based streaming response handler for durable agents. + +This module provides reliable, resumable streaming of agent responses using Redis Streams +as a message broker. It enables clients to disconnect and reconnect without losing messages. +""" + +import asyncio +import time +from dataclasses import dataclass +from datetime import timedelta +from collections.abc import AsyncIterator + +import redis.asyncio as aioredis + + +@dataclass +class StreamChunk: + """Represents a chunk of streamed data from Redis. + + Attributes: + entry_id: The Redis stream entry ID (used as cursor for resumption). + text: The text content of the chunk, if any. + is_done: Whether this is the final chunk in the stream. + error: Error message if an error occurred, otherwise None. + """ + entry_id: str + text: str | None = None + is_done: bool = False + error: str | None = None + + +class RedisStreamResponseHandler: + """Handles agent responses by persisting them to Redis Streams. + + This handler writes agent response updates to Redis Streams, enabling reliable, + resumable streaming delivery to clients. Clients can disconnect and reconnect + at any point using cursor-based pagination. + + Attributes: + MAX_EMPTY_READS: Maximum number of empty reads before timing out. + POLL_INTERVAL_MS: Interval in milliseconds between polling attempts. + """ + + MAX_EMPTY_READS = 300 + POLL_INTERVAL_MS = 1000 + + def __init__(self, redis_client: aioredis.Redis, stream_ttl: timedelta): + """Initialize the Redis stream response handler. + + Args: + redis_client: The async Redis client instance. + stream_ttl: Time-to-live for stream entries in Redis. + """ + self._redis = redis_client + self._stream_ttl = stream_ttl + + async def __aenter__(self): + """Enter async context manager.""" + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Exit async context manager and close Redis connection.""" + await self._redis.aclose() + + async def write_chunk( + self, + conversation_id: str, + text: str, + sequence: int, + ) -> None: + """Write a single text chunk to the Redis Stream. + + Args: + conversation_id: The conversation ID for this agent run. + text: The text content to write. + sequence: The sequence number for ordering. + """ + stream_key = self._get_stream_key(conversation_id) + await self._redis.xadd( + stream_key, + { + "text": text, + "sequence": str(sequence), + "timestamp": str(int(time.time() * 1000)), + } + ) + await self._redis.expire(stream_key, self._stream_ttl) + + async def write_completion( + self, + conversation_id: str, + sequence: int, + ) -> None: + """Write an end-of-stream marker to the Redis Stream. + + Args: + conversation_id: The conversation ID for this agent run. + sequence: The final sequence number. + """ + stream_key = self._get_stream_key(conversation_id) + await self._redis.xadd( + stream_key, + { + "text": "", + "sequence": str(sequence), + "timestamp": str(int(time.time() * 1000)), + "done": "true", + } + ) + await self._redis.expire(stream_key, self._stream_ttl) + + async def read_stream( + self, + conversation_id: str, + cursor: str | None = None, + ) -> AsyncIterator[StreamChunk]: + """Read entries from a Redis Stream with cursor-based pagination. + + This method polls the Redis Stream for new entries, yielding chunks as they + become available. Clients can resume from any point using the entry_id from + a previous chunk. + + Args: + conversation_id: The conversation ID to read from. + cursor: Optional cursor to resume from. If None, starts from beginning. + + Yields: + StreamChunk instances containing text content or status markers. + """ + stream_key = self._get_stream_key(conversation_id) + start_id = cursor if cursor else "0-0" + + empty_read_count = 0 + has_seen_data = False + + while True: + try: + # Read up to 100 entries from the stream + entries = await self._redis.xread( + {stream_key: start_id}, + count=100, + block=None, + ) + + if not entries: + # No entries found + if not has_seen_data: + empty_read_count += 1 + if empty_read_count >= self.MAX_EMPTY_READS: + timeout_seconds = self.MAX_EMPTY_READS * self.POLL_INTERVAL_MS / 1000 + yield StreamChunk( + entry_id=start_id, + error=f"Stream not found or timed out after {timeout_seconds} seconds" + ) + return + + # Wait before polling again + await asyncio.sleep(self.POLL_INTERVAL_MS / 1000) + continue + + has_seen_data = True + + # Process entries from the stream + for stream_name, stream_entries in entries: + for entry_id, entry_data in stream_entries: + start_id = entry_id.decode() if isinstance(entry_id, bytes) else entry_id + + # Decode entry data + text = entry_data.get(b"text", b"").decode() if b"text" in entry_data else None + done = entry_data.get(b"done", b"").decode() if b"done" in entry_data else None + error = entry_data.get(b"error", b"").decode() if b"error" in entry_data else None + + if error: + yield StreamChunk(entry_id=start_id, error=error) + return + + if done == "true": + yield StreamChunk(entry_id=start_id, is_done=True) + return + + if text: + yield StreamChunk(entry_id=start_id, text=text) + + except Exception as ex: + yield StreamChunk(entry_id=start_id, error=str(ex)) + return + + @staticmethod + def _get_stream_key(conversation_id: str) -> str: + """Generate the Redis key for a conversation's stream. + + Args: + conversation_id: The conversation ID. + + Returns: + The Redis stream key. + """ + return f"agent-stream:{conversation_id}" diff --git a/python/samples/getting_started/azure_functions/03_callbacks/requirements.txt b/python/samples/getting_started/azure_functions/03_reliable_streaming/requirements.txt similarity index 59% rename from python/samples/getting_started/azure_functions/03_callbacks/requirements.txt rename to python/samples/getting_started/azure_functions/03_reliable_streaming/requirements.txt index 8aa2c75d80..8b3943b92f 100644 --- a/python/samples/getting_started/azure_functions/03_callbacks/requirements.txt +++ b/python/samples/getting_started/azure_functions/03_reliable_streaming/requirements.txt @@ -1,2 +1,3 @@ agent-framework-azurefunctions -azure-identity \ No newline at end of file +azure-identity +redis diff --git a/python/samples/getting_started/azure_functions/03_reliable_streaming/tools.py b/python/samples/getting_started/azure_functions/03_reliable_streaming/tools.py new file mode 100644 index 0000000000..6a71fdfa03 --- /dev/null +++ b/python/samples/getting_started/azure_functions/03_reliable_streaming/tools.py @@ -0,0 +1,165 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Mock travel tools for demonstration purposes. + +In a real application, these would call actual weather and events APIs. +""" + +from typing import Annotated + + +def get_weather_forecast( + destination: Annotated[str, "The destination city or location"], + date: Annotated[str, 'The date for the forecast (e.g., "2025-01-15" or "next Monday")'], +) -> str: + """Get the weather forecast for a destination on a specific date. + + Use this to provide weather-aware recommendations in the itinerary. + + Args: + destination: The destination city or location. + date: The date for the forecast. + + Returns: + A weather forecast summary. + """ + # Mock weather data based on destination for realistic responses + weather_by_region = { + "Tokyo": ("Partly cloudy with a chance of light rain", 58, 45), + "Paris": ("Overcast with occasional drizzle", 52, 41), + "New York": ("Clear and cold", 42, 28), + "London": ("Foggy morning, clearing in afternoon", 48, 38), + "Sydney": ("Sunny and warm", 82, 68), + "Rome": ("Sunny with light breeze", 62, 48), + "Barcelona": ("Partly sunny", 59, 47), + "Amsterdam": ("Cloudy with light rain", 46, 38), + "Dubai": ("Sunny and hot", 85, 72), + "Singapore": ("Tropical thunderstorms in afternoon", 88, 77), + "Bangkok": ("Hot and humid, afternoon showers", 91, 78), + "Los Angeles": ("Sunny and pleasant", 72, 55), + "San Francisco": ("Morning fog, afternoon sun", 62, 52), + "Seattle": ("Rainy with breaks", 48, 40), + "Miami": ("Warm and sunny", 78, 65), + "Honolulu": ("Tropical paradise weather", 82, 72), + } + + # Find a matching destination or use a default + forecast = ("Partly cloudy", 65, 50) + for city, weather in weather_by_region.items(): + if city.lower() in destination.lower(): + forecast = weather + break + + condition, high_f, low_f = forecast + high_c = (high_f - 32) * 5 // 9 + low_c = (low_f - 32) * 5 // 9 + + recommendation = _get_weather_recommendation(condition) + + return f"""Weather forecast for {destination} on {date}: +Conditions: {condition} +High: {high_f}°F ({high_c}°C) +Low: {low_f}°F ({low_c}°C) + +Recommendation: {recommendation}""" + + +def get_local_events( + destination: Annotated[str, "The destination city or location"], + date: Annotated[str, 'The date to search for events (e.g., "2025-01-15" or "next week")'], +) -> str: + """Get local events and activities happening at a destination around a specific date. + + Use this to suggest timely activities and experiences. + + Args: + destination: The destination city or location. + date: The date to search for events. + + Returns: + A list of local events and activities. + """ + # Mock events data based on destination + events_by_city = { + "Tokyo": [ + "🎭 Kabuki Theater Performance at Kabukiza Theatre - Traditional Japanese drama", + "🌸 Winter Illuminations at Yoyogi Park - Spectacular light displays", + "🍜 Ramen Festival at Tokyo Station - Sample ramen from across Japan", + "🎮 Gaming Expo at Tokyo Big Sight - Latest video games and technology", + ], + "Paris": [ + "🎨 Impressionist Exhibition at Musée d'Orsay - Extended evening hours", + "🍷 Wine Tasting Tour in Le Marais - Local sommelier guided", + "🎵 Jazz Night at Le Caveau de la Huchette - Historic jazz club", + "🥐 French Pastry Workshop - Learn from master pâtissiers", + ], + "New York": [ + "🎭 Broadway Show: Hamilton - Limited engagement performances", + "🏀 Knicks vs Lakers at Madison Square Garden", + "🎨 Modern Art Exhibit at MoMA - New installations", + "🍕 Pizza Walking Tour of Brooklyn - Artisan pizzerias", + ], + "London": [ + "👑 Royal Collection Exhibition at Buckingham Palace", + "🎭 West End Musical: The Phantom of the Opera", + "🍺 Craft Beer Festival at Brick Lane", + "🎪 Winter Wonderland at Hyde Park - Rides and markets", + ], + "Sydney": [ + "🏄 Pro Surfing Competition at Bondi Beach", + "🎵 Opera at Sydney Opera House - La Bohème", + "🦘 Wildlife Night Safari at Taronga Zoo", + "🍽️ Harbor Dinner Cruise with fireworks", + ], + "Rome": [ + "🏛️ After-Hours Vatican Tour - Skip the crowds", + "🍝 Pasta Making Class in Trastevere", + "🎵 Classical Concert at Borghese Gallery", + "🍷 Wine Tasting in Roman Cellars", + ], + } + + # Find events for the destination or use generic events + events = [ + "🎭 Local theater performance", + "🍽️ Food and wine festival", + "🎨 Art gallery opening", + "🎵 Live music at local venues", + ] + + for city, city_events in events_by_city.items(): + if city.lower() in destination.lower(): + events = city_events + break + + event_list = "\n• ".join(events) + return f"""Local events in {destination} around {date}: + +• {event_list} + +💡 Tip: Book popular events in advance as they may sell out quickly!""" + + +def _get_weather_recommendation(condition: str) -> str: + """Get a recommendation based on weather conditions. + + Args: + condition: The weather condition description. + + Returns: + A recommendation string. + """ + condition_lower = condition.lower() + + if "rain" in condition_lower or "drizzle" in condition_lower: + return "Bring an umbrella and waterproof jacket. Consider indoor activities for backup." + elif "fog" in condition_lower: + return "Morning visibility may be limited. Plan outdoor sightseeing for afternoon." + elif "cold" in condition_lower: + return "Layer up with warm clothing. Hot drinks and cozy cafés recommended." + elif "hot" in condition_lower or "warm" in condition_lower: + return "Stay hydrated and use sunscreen. Plan strenuous activities for cooler morning hours." + elif "thunder" in condition_lower or "storm" in condition_lower: + return "Keep an eye on weather updates. Have indoor alternatives ready." + else: + return "Pleasant conditions expected. Great day for outdoor exploration!"