diff --git a/.github/actions/azure-functions-integration-setup/action.yml b/.github/actions/azure-functions-integration-setup/action.yml index 6be5afb814..357168d92e 100644 --- a/.github/actions/azure-functions-integration-setup/action.yml +++ b/.github/actions/azure-functions-integration-setup/action.yml @@ -28,6 +28,18 @@ runs: echo "Waiting for Azurite (Azure Storage emulator) to be ready" timeout 30 bash -c 'until curl --silent http://localhost:10000/devstoreaccount1; do sleep 1; done' echo "Azurite (Azure Storage emulator) is ready" + - name: Start Redis + shell: bash + run: | + if [ "$(docker ps -aq -f name=redis)" ]; then + echo "Stopping and removing existing Redis" + docker rm -f redis + fi + echo "Starting Redis" + docker run -d --name redis -p 6379:6379 redis:latest + echo "Waiting for Redis to be ready" + timeout 30 bash -c 'until docker exec redis redis-cli ping | grep -q PONG; do sleep 1; done' + echo "Redis is ready" - name: Install Azure Functions Core Tools shell: bash run: | diff --git a/dotnet/Directory.Packages.props b/dotnet/Directory.Packages.props index 95d35508e1..9202b36f2a 100644 --- a/dotnet/Directory.Packages.props +++ b/dotnet/Directory.Packages.props @@ -125,6 +125,8 @@ + + diff --git a/dotnet/agent-framework-dotnet.slnx b/dotnet/agent-framework-dotnet.slnx index 5e08a766f9..002efdbab1 100644 --- a/dotnet/agent-framework-dotnet.slnx +++ b/dotnet/agent-framework-dotnet.slnx @@ -33,6 +33,7 @@ + diff --git a/dotnet/samples/AzureFunctions/08_ReliableStreaming/08_ReliableStreaming.csproj b/dotnet/samples/AzureFunctions/08_ReliableStreaming/08_ReliableStreaming.csproj new file mode 100644 index 0000000000..df0b60a4a1 --- /dev/null +++ b/dotnet/samples/AzureFunctions/08_ReliableStreaming/08_ReliableStreaming.csproj @@ -0,0 +1,47 @@ + + + net10.0 + v4 + Exe + enable + enable + + ReliableStreaming + ReliableStreaming + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/dotnet/samples/AzureFunctions/08_ReliableStreaming/FunctionTriggers.cs b/dotnet/samples/AzureFunctions/08_ReliableStreaming/FunctionTriggers.cs new file mode 100644 index 0000000000..e642b64337 --- /dev/null +++ b/dotnet/samples/AzureFunctions/08_ReliableStreaming/FunctionTriggers.cs @@ -0,0 +1,320 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Text; +using Microsoft.Agents.AI; +using Microsoft.Agents.AI.DurableTask; +using Microsoft.Agents.AI.Hosting.AzureFunctions; +using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Http.Features; +using Microsoft.AspNetCore.Mvc; +using Microsoft.Azure.Functions.Worker; +using Microsoft.DurableTask.Client; +using Microsoft.Extensions.Logging; + +namespace ReliableStreaming; + +/// +/// HTTP trigger functions for reliable streaming of durable agent responses. +/// +/// +/// This class exposes two endpoints: +/// +/// +/// Create +/// Starts an agent run and streams responses. The response format depends on the +/// Accept header: text/plain returns raw text (ideal for terminals), while +/// text/event-stream or any other value returns Server-Sent Events (SSE). +/// +/// +/// Stream +/// Resumes a stream from a cursor position, enabling reliable message delivery +/// +/// +/// +public sealed class FunctionTriggers +{ + private readonly RedisStreamResponseHandler _streamHandler; + private readonly ILogger _logger; + + /// + /// Initializes a new instance of the class. + /// + /// The Redis stream handler for reading/writing agent responses. + /// The logger instance. + public FunctionTriggers(RedisStreamResponseHandler streamHandler, ILogger logger) + { + this._streamHandler = streamHandler; + this._logger = logger; + } + + /// + /// Creates a new agent session, starts an agent run with the provided prompt, + /// and streams the response back to the client. + /// + /// + /// + /// The response format depends on the Accept header: + /// + /// text/plain: Returns raw text output, ideal for terminal display with curl + /// text/event-stream or other: Returns Server-Sent Events (SSE) with cursor support + /// + /// + /// + /// The response includes an x-conversation-id header containing the conversation ID. + /// For SSE responses, clients can use this conversation ID to resume the stream if disconnected + /// by calling the endpoint with the conversation ID and the last received cursor. + /// + /// + /// Each SSE event contains the following fields: + /// + /// id: The Redis stream entry ID (use as cursor for resumption) + /// event: Either "message" for content or "done" for stream completion + /// data: The text content of the response chunk + /// + /// + /// + /// The HTTP request containing the prompt in the body. + /// The Durable Task client for signaling agents. + /// The function invocation context. + /// Cancellation token. + /// A streaming response in the format specified by the Accept header. + [Function(nameof(CreateAsync))] + public async Task CreateAsync( + [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "agent/create")] HttpRequest request, + [DurableClient] DurableTaskClient durableClient, + FunctionContext context, + CancellationToken cancellationToken) + { + // Read the prompt from the request body + string prompt = await new StreamReader(request.Body).ReadToEndAsync(cancellationToken); + if (string.IsNullOrWhiteSpace(prompt)) + { + return new BadRequestObjectResult("Request body must contain a prompt."); + } + + AIAgent agentProxy = durableClient.AsDurableAgentProxy(context, "TravelPlanner"); + + // Create a new agent thread + AgentThread thread = agentProxy.GetNewThread(); + AgentThreadMetadata metadata = thread.GetService() + ?? throw new InvalidOperationException("Failed to get AgentThreadMetadata from new thread."); + + this._logger.LogInformation("Creating new agent session: {ConversationId}", metadata.ConversationId); + + // Run the agent in the background (fire-and-forget) + DurableAgentRunOptions options = new() { IsFireAndForget = true }; + await agentProxy.RunAsync(prompt, thread, options, cancellationToken); + + this._logger.LogInformation("Agent run started for session: {ConversationId}", metadata.ConversationId); + + // Check Accept header to determine response format + // text/plain = raw text output (ideal for terminals) + // text/event-stream or other = SSE format (supports resumption) + string? acceptHeader = request.Headers.Accept.FirstOrDefault(); + bool useSseFormat = acceptHeader?.Contains("text/plain", StringComparison.OrdinalIgnoreCase) != true; + + return await this.StreamToClientAsync( + conversationId: metadata.ConversationId!, cursor: null, useSseFormat, request.HttpContext, cancellationToken); + } + + /// + /// Resumes streaming from a specific cursor position for an existing session. + /// + /// + /// + /// Use this endpoint to resume a stream after disconnection. Pass the conversation ID + /// (from the x-conversation-id response header) and the last received cursor + /// (Redis stream entry ID) to continue from where you left off. + /// + /// + /// If no cursor is provided, streaming starts from the beginning of the stream. + /// This allows clients to replay the entire response if needed. + /// + /// + /// The response format depends on the Accept header: + /// + /// text/plain: Returns raw text output, ideal for terminal display with curl + /// text/event-stream or other: Returns Server-Sent Events (SSE) with cursor support + /// + /// + /// + /// The HTTP request. Use the cursor query parameter to specify the cursor position. + /// The conversation ID to stream from. + /// Cancellation token. + /// A streaming response in the format specified by the Accept header. + [Function(nameof(StreamAsync))] + public async Task StreamAsync( + [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "agent/stream/{conversationId}")] HttpRequest request, + string conversationId, + CancellationToken cancellationToken) + { + if (string.IsNullOrWhiteSpace(conversationId)) + { + return new BadRequestObjectResult("Conversation ID is required."); + } + + // Get the cursor from query string (optional) + string? cursor = request.Query["cursor"].FirstOrDefault(); + + this._logger.LogInformation( + "Resuming stream for conversation {ConversationId} from cursor: {Cursor}", + conversationId, + cursor ?? "(beginning)"); + + // Check Accept header to determine response format + // text/plain = raw text output (ideal for terminals) + // text/event-stream or other = SSE format (supports cursor-based resumption) + string? acceptHeader = request.Headers.Accept.FirstOrDefault(); + bool useSseFormat = acceptHeader?.Contains("text/plain", StringComparison.OrdinalIgnoreCase) != true; + + return await this.StreamToClientAsync(conversationId, cursor, useSseFormat, request.HttpContext, cancellationToken); + } + + /// + /// Streams chunks from the Redis stream to the HTTP response. + /// + /// The conversation ID to stream from. + /// Optional cursor to resume from. If null, streams from the beginning. + /// True to use SSE format, false for plain text. + /// The HTTP context for writing the response. + /// Cancellation token. + /// An empty result after streaming completes. + private async Task StreamToClientAsync( + string conversationId, + string? cursor, + bool useSseFormat, + HttpContext httpContext, + CancellationToken cancellationToken) + { + // Set response headers based on format + httpContext.Response.Headers.ContentType = useSseFormat + ? "text/event-stream" + : "text/plain; charset=utf-8"; + httpContext.Response.Headers.CacheControl = "no-cache"; + httpContext.Response.Headers.Connection = "keep-alive"; + httpContext.Response.Headers["x-conversation-id"] = conversationId; + + // Disable response buffering if supported + httpContext.Features.Get()?.DisableBuffering(); + + try + { + await foreach (StreamChunk chunk in this._streamHandler.ReadStreamAsync( + conversationId, + cursor, + cancellationToken)) + { + if (chunk.Error != null) + { + this._logger.LogWarning("Stream error for conversation {ConversationId}: {Error}", conversationId, chunk.Error); + await WriteErrorAsync(httpContext.Response, chunk.Error, useSseFormat, cancellationToken); + break; + } + + if (chunk.IsDone) + { + await WriteEndOfStreamAsync(httpContext.Response, chunk.EntryId, useSseFormat, cancellationToken); + break; + } + + if (chunk.Text != null) + { + await WriteChunkAsync(httpContext.Response, chunk, useSseFormat, cancellationToken); + } + } + } + catch (OperationCanceledException) + { + this._logger.LogInformation("Client disconnected from stream {ConversationId}", conversationId); + } + + return new EmptyResult(); + } + + /// + /// Writes a text chunk to the response. + /// + private static async Task WriteChunkAsync( + HttpResponse response, + StreamChunk chunk, + bool useSseFormat, + CancellationToken cancellationToken) + { + if (useSseFormat) + { + await WriteSSEEventAsync(response, "message", chunk.Text!, chunk.EntryId); + } + else + { + await response.WriteAsync(chunk.Text!, cancellationToken); + } + + await response.Body.FlushAsync(cancellationToken); + } + + /// + /// Writes an end-of-stream marker to the response. + /// + private static async Task WriteEndOfStreamAsync( + HttpResponse response, + string entryId, + bool useSseFormat, + CancellationToken cancellationToken) + { + if (useSseFormat) + { + await WriteSSEEventAsync(response, "done", "[DONE]", entryId); + } + else + { + await response.WriteAsync("\n", cancellationToken); + } + + await response.Body.FlushAsync(cancellationToken); + } + + /// + /// Writes an error message to the response. + /// + private static async Task WriteErrorAsync( + HttpResponse response, + string error, + bool useSseFormat, + CancellationToken cancellationToken) + { + if (useSseFormat) + { + await WriteSSEEventAsync(response, "error", error, null); + } + else + { + await response.WriteAsync($"\n[Error: {error}]\n", cancellationToken); + } + + await response.Body.FlushAsync(cancellationToken); + } + + /// + /// Writes a Server-Sent Event to the response stream. + /// + private static async Task WriteSSEEventAsync( + HttpResponse response, + string eventType, + string data, + string? id) + { + StringBuilder sb = new(); + + // Include the ID if provided (used as cursor for resumption) + if (!string.IsNullOrEmpty(id)) + { + sb.AppendLine($"id: {id}"); + } + + sb.AppendLine($"event: {eventType}"); + sb.AppendLine($"data: {data}"); + sb.AppendLine(); // Empty line marks end of event + + await response.WriteAsync(sb.ToString()); + } +} diff --git a/dotnet/samples/AzureFunctions/08_ReliableStreaming/Program.cs b/dotnet/samples/AzureFunctions/08_ReliableStreaming/Program.cs new file mode 100644 index 0000000000..6c48ed4177 --- /dev/null +++ b/dotnet/samples/AzureFunctions/08_ReliableStreaming/Program.cs @@ -0,0 +1,100 @@ +// Copyright (c) Microsoft. All rights reserved. + +// This sample demonstrates how to implement reliable streaming for durable agents using Redis Streams. +// It exposes two HTTP endpoints: +// 1. Create - Starts an agent run and streams responses back via Server-Sent Events (SSE) +// 2. Stream - Resumes a stream from a specific cursor position, enabling reliable message delivery +// +// This pattern is inspired by OpenAI's background mode for the Responses API, which allows clients +// to disconnect and reconnect to ongoing agent responses without losing messages. + +using Azure; +using Azure.AI.OpenAI; +using Azure.Identity; +using Microsoft.Agents.AI.DurableTask; +using Microsoft.Agents.AI.Hosting.AzureFunctions; +using Microsoft.Azure.Functions.Worker.Builder; +using Microsoft.Extensions.AI; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using OpenAI.Chat; +using ReliableStreaming; +using StackExchange.Redis; + +// Get the Azure OpenAI endpoint and deployment name from environment variables. +string endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") + ?? throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set."); +string deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT") + ?? throw new InvalidOperationException("AZURE_OPENAI_DEPLOYMENT is not set."); + +// Get Redis connection string from environment variable. +string redisConnectionString = Environment.GetEnvironmentVariable("REDIS_CONNECTION_STRING") + ?? "localhost:6379"; + +// Get the Redis stream TTL from environment variable (default: 10 minutes). +int redisStreamTtlMinutes = int.TryParse( + Environment.GetEnvironmentVariable("REDIS_STREAM_TTL_MINUTES"), + out int ttlMinutes) ? ttlMinutes : 10; + +// Use Azure Key Credential if provided, otherwise use Azure CLI Credential. +string? azureOpenAiKey = System.Environment.GetEnvironmentVariable("AZURE_OPENAI_KEY"); +AzureOpenAIClient client = !string.IsNullOrEmpty(azureOpenAiKey) + ? new AzureOpenAIClient(new Uri(endpoint), new AzureKeyCredential(azureOpenAiKey)) + : new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential()); + +// Travel Planner agent instructions - designed to produce longer responses for demonstrating streaming. +const string TravelPlannerName = "TravelPlanner"; +const string TravelPlannerInstructions = + """ + You are an expert travel planner who creates detailed, personalized travel itineraries. + When asked to plan a trip, you should: + 1. Create a comprehensive day-by-day itinerary + 2. Include specific recommendations for activities, restaurants, and attractions + 3. Provide practical tips for each destination + 4. Consider weather and local events when making recommendations + 5. Include estimated times and logistics between activities + + Always use the available tools to get current weather forecasts and local events + for the destination to make your recommendations more relevant and timely. + + Format your response with clear headings for each day and include emoji icons + to make the itinerary easy to scan and visually appealing. + """; + +// Configure the function app to host the AI agent. +FunctionsApplicationBuilder builder = FunctionsApplication + .CreateBuilder(args) + .ConfigureFunctionsWebApplication() + .ConfigureDurableAgents(options => + { + // Define the Travel Planner agent with tools for weather and events + options.AddAIAgentFactory(TravelPlannerName, sp => + { + return client.GetChatClient(deploymentName).CreateAIAgent( + instructions: TravelPlannerInstructions, + name: TravelPlannerName, + services: sp, + tools: [ + AIFunctionFactory.Create(TravelTools.GetWeatherForecast), + AIFunctionFactory.Create(TravelTools.GetLocalEvents), + ]); + }); + }); + +// Register Redis connection as a singleton +builder.Services.AddSingleton(_ => + ConnectionMultiplexer.Connect(redisConnectionString)); + +// Register the Redis stream response handler - this captures agent responses +// and publishes them to Redis Streams for reliable delivery. +// Registered as both the concrete type (for FunctionTriggers) and the interface (for the agent framework). +builder.Services.AddSingleton(sp => + new RedisStreamResponseHandler( + sp.GetRequiredService(), + TimeSpan.FromMinutes(redisStreamTtlMinutes))); +builder.Services.AddSingleton(sp => + sp.GetRequiredService()); + +using IHost app = builder.Build(); + +app.Run(); diff --git a/dotnet/samples/AzureFunctions/08_ReliableStreaming/README.md b/dotnet/samples/AzureFunctions/08_ReliableStreaming/README.md new file mode 100644 index 0000000000..f1c68c2339 --- /dev/null +++ b/dotnet/samples/AzureFunctions/08_ReliableStreaming/README.md @@ -0,0 +1,264 @@ +# Reliable Streaming with Redis + +This sample demonstrates how to implement reliable streaming for durable agents using Redis Streams as a message broker. It enables clients to disconnect and reconnect to ongoing agent responses without losing messages, inspired by [OpenAI's background mode](https://platform.openai.com/docs/guides/background) for the Responses API. + +## Key Concepts Demonstrated + +- **Reliable message delivery**: Agent responses are persisted to Redis Streams, allowing clients to resume from any point +- **Content negotiation**: Use `Accept: text/plain` for raw terminal output, or `Accept: text/event-stream` for SSE format +- **Server-Sent Events (SSE)**: Standard streaming format that works with `curl`, browsers, and most HTTP clients +- **Cursor-based resumption**: Each SSE event includes an `id` field that can be used to resume the stream +- **Fire-and-forget agent invocation**: The agent runs in the background while the client streams from Redis via an HTTP trigger function + +## Environment Setup + +See the [README.md](../README.md) file in the parent directory for more information on how to configure the environment, including how to install and run common sample dependencies. + +### Additional Requirements: Redis + +This sample requires a Redis instance. Start a local Redis instance using Docker: + +```bash +docker run -d --name redis -p 6379:6379 redis:latest +``` + +To verify Redis is running: + +```bash +docker ps | grep redis +``` + +## Running the Sample + +Start the Azure Functions host: + +```bash +func start +``` + +### 1. Test Streaming with curl + +Open a new terminal and start a travel planning request. Use the `-i` flag to see response headers (including the conversation ID) and `Accept: text/plain` for raw text output: + +**Bash (Linux/macOS/WSL):** + +```bash +curl -i -N -X POST http://localhost:7071/api/agent/create \ + -H "Content-Type: text/plain" \ + -H "Accept: text/plain" \ + -d "Plan a 7-day trip to Tokyo, Japan for next month. Include daily activities, restaurant recommendations, and tips for getting around." +``` + +**PowerShell:** + +```powershell +curl -i -N -X POST http://localhost:7071/api/agent/create ` + -H "Content-Type: text/plain" ` + -H "Accept: text/plain" ` + -d "Plan a 7-day trip to Tokyo, Japan for next month. Include daily activities, restaurant recommendations, and tips for getting around." +``` + +You'll first see the response headers, including: + +```text +HTTP/1.1 200 OK +Content-Type: text/plain; charset=utf-8 +x-conversation-id: @dafx-travelplanner@a1b2c3d4e5f67890abcdef1234567890 +... +``` + +Then the agent's response will stream to your terminal in chunks, similar to a ChatGPT-style experience (though not character-by-character). + +> **Note:** The `-N` flag in curl disables output buffering, which is essential for seeing the stream in real-time. The `-i` flag includes the HTTP headers in the output. + +### 2. Demonstrate Stream Interruption and Resumption + +This is the key feature of reliable streaming! Follow these steps to see it in action: + +#### Step 1: Start a stream and note the conversation ID + +Run the curl command from step 1. Watch for the `x-conversation-id` header in the response - **copy this value**, you'll need it to resume. + +```text +x-conversation-id: @dafx-travelplanner@a1b2c3d4e5f67890abcdef1234567890 +``` + +#### Step 2: Interrupt the stream + +While the agent is still generating text, press **`Ctrl+C`** to interrupt the stream. The agent continues running in the background - your messages are being saved to Redis! + +#### Step 3: Resume the stream + +Use the conversation ID you copied to resume streaming from where you left off. Include the `Accept: text/plain` header to get raw text output: + +**Bash (Linux/macOS/WSL):** + +```bash +# Replace with your actual conversation ID from the x-conversation-id header +CONVERSATION_ID="@dafx-travelplanner@a1b2c3d4e5f67890abcdef1234567890" + +curl -N -H "Accept: text/plain" "http://localhost:7071/api/agent/stream/${CONVERSATION_ID}" +``` + +**PowerShell:** + +```powershell +# Replace with your actual conversation ID from the x-conversation-id header +$conversationId = "@dafx-travelplanner@a1b2c3d4e5f67890abcdef1234567890" + +curl -N -H "Accept: text/plain" "http://localhost:7071/api/agent/stream/$conversationId" +``` + +You'll see the **entire response replayed from the beginning**, including the parts you already received before interrupting. + +#### Step 4 (Advanced): Resume from a specific cursor + +If you're using SSE format, each event includes an `id` field that you can use as a cursor to resume from a specific point: + +```bash +# Resume from a specific cursor position +curl -N "http://localhost:7071/api/agent/stream/${CONVERSATION_ID}?cursor=1734567890123-0" +``` + +### 3. Alternative: SSE Format for Programmatic Clients + +If you need the full Server-Sent Events format with cursors for resumable streaming, use `Accept: text/event-stream` (or omit the Accept header): + +```bash +curl -i -N -X POST http://localhost:7071/api/agent/create \ + -H "Content-Type: text/plain" \ + -H "Accept: text/event-stream" \ + -d "Plan a 7-day trip to Tokyo, Japan." +``` + +This returns SSE-formatted events with `id`, `event`, and `data` fields: + +```text +id: 1734567890123-0 +event: message +data: # 7-Day Tokyo Adventure + +id: 1734567890124-0 +event: message +data: ## Day 1: Arrival and Exploration + +id: 1734567890999-0 +event: done +data: [DONE] +``` + +The `id` field is the Redis stream entry ID - use it as the `cursor` parameter to resume from that exact point. + +### Understanding the Response Headers + +| Header | Description | +|--------|-------------| +| `x-conversation-id` | The conversation ID (session key). Use this to resume the stream. | +| `Content-Type` | Either `text/plain` or `text/event-stream` depending on your `Accept` header. | +| `Cache-Control` | Set to `no-cache` to prevent caching of the stream. | + +## Architecture Overview + +```text +┌─────────────┐ POST /agent/create ┌─────────────────────┐ +│ Client │ (Accept: text/plain or SSE)│ Azure Functions │ +│ (curl) │ ──────────────────────────► │ (FunctionTriggers) │ +└─────────────┘ └──────────┬──────────┘ + ▲ │ + │ Text or SSE stream Signal Entity + │ │ + │ ▼ + │ ┌─────────────────────┐ + │ │ AgentEntity │ + │ │ (Durable Entity) │ + │ └──────────┬──────────┘ + │ │ + │ IAgentResponseHandler + │ │ + │ ▼ + │ ┌─────────────────────┐ + │ │ RedisStreamResponse │ + │ │ Handler │ + │ └──────────┬──────────┘ + │ │ + │ XADD (write) + │ │ + │ ▼ + │ ┌─────────────────────┐ + └─────────── XREAD (poll) ────────── │ Redis Streams │ + │ (Durable Log) │ + └─────────────────────┘ +``` + +### Data Flow + +1. **Client sends prompt**: The `Create` endpoint receives the prompt and generates a new agent thread. + +2. **Agent invoked**: The durable entity (`AgentEntity`) is signaled to run the travel planner agent. This is fire-and-forget from the HTTP request's perspective. + +3. **Responses captured**: As the agent generates responses, `RedisStreamResponseHandler` (implementing `IAgentResponseHandler`) extracts the text from each `AgentRunResponseUpdate` and publishes it to a Redis Stream keyed by session ID. + +4. **Client polls Redis**: The HTTP response streams events by polling the Redis Stream. For SSE format, each event includes the Redis entry ID as the `id` field. + +5. **Resumption**: If the client disconnects, it can call the `Stream` endpoint with the conversation ID (from the `x-conversation-id` header) and optionally the last received cursor to resume from that point. + +## Message Delivery Guarantees + +This sample provides **at-least-once delivery** with the following characteristics: + +- **Durability**: Messages are persisted to Redis Streams with configurable TTL (default: 10 minutes). +- **Ordering**: Messages are delivered in order within a session. +- **Resumption**: Clients can resume from any point using cursor-based pagination. +- **Replay**: Clients can replay the entire stream by omitting the cursor. + +### Important Considerations + +- **No exactly-once delivery**: If a client disconnects exactly when receiving a message, it may receive that message again upon resumption. Clients should handle duplicate messages idempotently. +- **TTL expiration**: Streams expire after the configured TTL. Clients cannot resume streams that have expired. +- **Redis guarantees**: Redis streams are backed by Redis persistence mechanisms (RDB/AOF). Ensure your Redis instance is configured for durability as needed. + +## When to Use These Patterns + +The patterns demonstrated in this sample are ideal for: + +- **Long-running agent tasks**: When agent responses take minutes to complete (e.g., deep research, complex planning) +- **Unreliable network connections**: Mobile apps, unstable WiFi, or connections that may drop +- **Resumable experiences**: Users should be able to close and reopen an app without losing context +- **Background processing**: When you want to fire off a task and check on it later + +These patterns may be overkill for: + +- **Simple, fast responses**: If responses complete in a few seconds, standard streaming is simpler +- **Stateless interactions**: If there's no need to resume or replay conversations +- **Very high throughput**: Redis adds latency; for maximum throughput, direct streaming may be better + +## Configuration + +| Environment Variable | Description | Default | +|---------------------|-------------|---------| +| `REDIS_CONNECTION_STRING` | Redis connection string | `localhost:6379` | +| `REDIS_STREAM_TTL_MINUTES` | How long streams are retained after last write | `10` | +| `AZURE_OPENAI_ENDPOINT` | Azure OpenAI endpoint URL | (required) | +| `AZURE_OPENAI_DEPLOYMENT` | Azure OpenAI deployment name | (required) | +| `AZURE_OPENAI_KEY` | API key (optional, uses Azure CLI auth if not set) | (optional) | + +## Cleanup + +To stop and remove the Redis Docker containers: + +```bash +docker stop redis +docker rm redis +``` + +## Disclaimer + +> ⚠️ **This sample is for illustration purposes only and is not intended to be production-ready.** +> +> A production implementation should consider: +> +> - Redis cluster configuration for high availability +> - Authentication and authorization for the streaming endpoints +> - Rate limiting and abuse prevention +> - Monitoring and alerting for stream health +> - Graceful handling of Redis failures diff --git a/dotnet/samples/AzureFunctions/08_ReliableStreaming/RedisStreamResponseHandler.cs b/dotnet/samples/AzureFunctions/08_ReliableStreaming/RedisStreamResponseHandler.cs new file mode 100644 index 0000000000..b0a95f49f6 --- /dev/null +++ b/dotnet/samples/AzureFunctions/08_ReliableStreaming/RedisStreamResponseHandler.cs @@ -0,0 +1,213 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Runtime.CompilerServices; +using Microsoft.Agents.AI; +using Microsoft.Agents.AI.DurableTask; +using StackExchange.Redis; + +namespace ReliableStreaming; + +/// +/// Represents a chunk of data read from a Redis stream. +/// +/// The Redis stream entry ID (can be used as a cursor for resumption). +/// The text content of the chunk, or null if this is a completion/error marker. +/// True if this chunk marks the end of the stream. +/// An error message if something went wrong, or null otherwise. +public readonly record struct StreamChunk(string EntryId, string? Text, bool IsDone, string? Error); + +/// +/// An implementation of that publishes agent response updates +/// to Redis Streams for reliable delivery. This enables clients to disconnect and reconnect +/// to ongoing agent responses without losing messages. +/// +/// +/// +/// Redis Streams provide a durable, append-only log that supports consumer groups and message +/// acknowledgment. This implementation uses auto-generated IDs (which are timestamp-based) +/// as sequence numbers, allowing clients to resume from any point in the stream. +/// +/// +/// Each agent session gets its own Redis Stream, keyed by session ID. The stream entries +/// contain text chunks extracted from objects. +/// +/// +public sealed class RedisStreamResponseHandler : IAgentResponseHandler +{ + private const int MaxEmptyReads = 300; // 5 minutes at 1 second intervals + private const int PollIntervalMs = 1000; + + private readonly IConnectionMultiplexer _redis; + private readonly TimeSpan _streamTtl; + + /// + /// Initializes a new instance of the class. + /// + /// The Redis connection multiplexer. + /// The time-to-live for stream entries. Streams will expire after this duration of inactivity. + public RedisStreamResponseHandler(IConnectionMultiplexer redis, TimeSpan streamTtl) + { + this._redis = redis; + this._streamTtl = streamTtl; + } + + /// + public async ValueTask OnStreamingResponseUpdateAsync( + IAsyncEnumerable messageStream, + CancellationToken cancellationToken) + { + // Get the current session ID from the DurableAgentContext + // This is set by the AgentEntity before invoking the response handler + DurableAgentContext? context = DurableAgentContext.Current; + if (context is null) + { + throw new InvalidOperationException( + "DurableAgentContext.Current is not set. This handler must be used within a durable agent context."); + } + + // Get conversation ID from the current thread context, which is only available in the context of + // a durable agent execution. + string conversationId = context.CurrentThread.GetService()?.ConversationId + ?? throw new InvalidOperationException("Unable to determine conversation ID from the current thread."); + string streamKey = GetStreamKey(conversationId); + + IDatabase db = this._redis.GetDatabase(); + int sequenceNumber = 0; + + await foreach (AgentRunResponseUpdate update in messageStream.WithCancellation(cancellationToken)) + { + // Extract just the text content - this avoids serialization round-trip issues + string text = update.Text; + + // Only publish non-empty text chunks + if (!string.IsNullOrEmpty(text)) + { + // Create the stream entry with the text and metadata + NameValueEntry[] entries = + [ + new NameValueEntry("text", text), + new NameValueEntry("sequence", sequenceNumber++), + new NameValueEntry("timestamp", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()), + ]; + + // Add to the Redis Stream with auto-generated ID (timestamp-based) + await db.StreamAddAsync(streamKey, entries); + + // Refresh the TTL on each write to keep the stream alive during active streaming + await db.KeyExpireAsync(streamKey, this._streamTtl); + } + } + + // Add a sentinel entry to mark the end of the stream + NameValueEntry[] endEntries = + [ + new NameValueEntry("text", ""), + new NameValueEntry("sequence", sequenceNumber), + new NameValueEntry("timestamp", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()), + new NameValueEntry("done", "true"), + ]; + await db.StreamAddAsync(streamKey, endEntries); + + // Set final TTL - the stream will be cleaned up after this duration + await db.KeyExpireAsync(streamKey, this._streamTtl); + } + + /// + public ValueTask OnAgentResponseAsync(AgentRunResponse message, CancellationToken cancellationToken) + { + // This handler is optimized for streaming responses. + // For non-streaming responses, we don't need to store in Redis since + // the response is returned directly to the caller. + return ValueTask.CompletedTask; + } + + /// + /// Reads chunks from a Redis stream for the given session, yielding them as they become available. + /// + /// The conversation ID to read from. + /// Optional cursor to resume from. If null, reads from the beginning. + /// Cancellation token. + /// An async enumerable of stream chunks. + public async IAsyncEnumerable ReadStreamAsync( + string conversationId, + string? cursor, + [EnumeratorCancellation] CancellationToken cancellationToken) + { + string streamKey = GetStreamKey(conversationId); + + IDatabase db = this._redis.GetDatabase(); + string startId = string.IsNullOrEmpty(cursor) ? "0-0" : cursor; + + int emptyReadCount = 0; + bool hasSeenData = false; + + while (!cancellationToken.IsCancellationRequested) + { + StreamEntry[]? entries = null; + string? errorMessage = null; + + try + { + entries = await db.StreamReadAsync(streamKey, startId, count: 100); + } + catch (Exception ex) + { + errorMessage = ex.Message; + } + + if (errorMessage != null) + { + yield return new StreamChunk(startId, null, false, errorMessage); + yield break; + } + + // entries is guaranteed to be non-null if errorMessage is null + if (entries!.Length == 0) + { + if (!hasSeenData) + { + emptyReadCount++; + if (emptyReadCount >= MaxEmptyReads) + { + yield return new StreamChunk( + startId, + null, + false, + $"Stream not found or timed out after {MaxEmptyReads * PollIntervalMs / 1000} seconds"); + yield break; + } + } + + await Task.Delay(PollIntervalMs, cancellationToken); + continue; + } + + hasSeenData = true; + + foreach (StreamEntry entry in entries) + { + startId = entry.Id.ToString(); + string? text = entry["text"]; + string? done = entry["done"]; + + if (done == "true") + { + yield return new StreamChunk(startId, null, true, null); + yield break; + } + + if (!string.IsNullOrEmpty(text)) + { + yield return new StreamChunk(startId, text, false, null); + } + } + } + } + + /// + /// Gets the Redis Stream key for a given conversation ID. + /// + /// The conversation ID. + /// The Redis Stream key. + internal static string GetStreamKey(string conversationId) => $"agent-stream:{conversationId}"; +} diff --git a/dotnet/samples/AzureFunctions/08_ReliableStreaming/Tools.cs b/dotnet/samples/AzureFunctions/08_ReliableStreaming/Tools.cs new file mode 100644 index 0000000000..fce73bc378 --- /dev/null +++ b/dotnet/samples/AzureFunctions/08_ReliableStreaming/Tools.cs @@ -0,0 +1,161 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.ComponentModel; + +namespace ReliableStreaming; + +/// +/// Mock travel tools that return hardcoded data for demonstration purposes. +/// In a real application, these would call actual weather and events APIs. +/// +internal static class TravelTools +{ + /// + /// Gets a weather forecast for a destination on a specific date. + /// Returns mock weather data for demonstration purposes. + /// + /// The destination city or location. + /// The date for the forecast (e.g., "2025-01-15" or "next Monday"). + /// A weather forecast summary. + [Description("Gets the weather forecast for a destination on a specific date. Use this to provide weather-aware recommendations in the itinerary.")] + public static string GetWeatherForecast(string destination, string date) + { + // Mock weather data based on destination for realistic responses + Dictionary weatherByRegion = new(StringComparer.OrdinalIgnoreCase) + { + ["Tokyo"] = ("Partly cloudy with a chance of light rain", 58, 45), + ["Paris"] = ("Overcast with occasional drizzle", 52, 41), + ["New York"] = ("Clear and cold", 42, 28), + ["London"] = ("Foggy morning, clearing in afternoon", 48, 38), + ["Sydney"] = ("Sunny and warm", 82, 68), + ["Rome"] = ("Sunny with light breeze", 62, 48), + ["Barcelona"] = ("Partly sunny", 59, 47), + ["Amsterdam"] = ("Cloudy with light rain", 46, 38), + ["Dubai"] = ("Sunny and hot", 85, 72), + ["Singapore"] = ("Tropical thunderstorms in afternoon", 88, 77), + ["Bangkok"] = ("Hot and humid, afternoon showers", 91, 78), + ["Los Angeles"] = ("Sunny and pleasant", 72, 55), + ["San Francisco"] = ("Morning fog, afternoon sun", 62, 52), + ["Seattle"] = ("Rainy with breaks", 48, 40), + ["Miami"] = ("Warm and sunny", 78, 65), + ["Honolulu"] = ("Tropical paradise weather", 82, 72), + }; + + // Find a matching destination or use a default + (string condition, int highF, int lowF) forecast = ("Partly cloudy", 65, 50); + foreach (KeyValuePair entry in weatherByRegion) + { + if (destination.Contains(entry.Key, StringComparison.OrdinalIgnoreCase)) + { + forecast = entry.Value; + break; + } + } + + return $""" + Weather forecast for {destination} on {date}: + Conditions: {forecast.condition} + High: {forecast.highF}°F ({(forecast.highF - 32) * 5 / 9}°C) + Low: {forecast.lowF}°F ({(forecast.lowF - 32) * 5 / 9}°C) + + Recommendation: {GetWeatherRecommendation(forecast.condition)} + """; + } + + /// + /// Gets local events happening at a destination around a specific date. + /// Returns mock event data for demonstration purposes. + /// + /// The destination city or location. + /// The date to search for events (e.g., "2025-01-15" or "next week"). + /// A list of local events and activities. + [Description("Gets local events and activities happening at a destination around a specific date. Use this to suggest timely activities and experiences.")] + public static string GetLocalEvents(string destination, string date) + { + // Mock events data based on destination + Dictionary eventsByCity = new(StringComparer.OrdinalIgnoreCase) + { + ["Tokyo"] = [ + "🎭 Kabuki Theater Performance at Kabukiza Theatre - Traditional Japanese drama", + "🌸 Winter Illuminations at Yoyogi Park - Spectacular light displays", + "🍜 Ramen Festival at Tokyo Station - Sample ramen from across Japan", + "🎮 Gaming Expo at Tokyo Big Sight - Latest video games and technology", + ], + ["Paris"] = [ + "🎨 Impressionist Exhibition at Musée d'Orsay - Extended evening hours", + "🍷 Wine Tasting Tour in Le Marais - Local sommelier guided", + "🎵 Jazz Night at Le Caveau de la Huchette - Historic jazz club", + "🥐 French Pastry Workshop - Learn from master pâtissiers", + ], + ["New York"] = [ + "🎭 Broadway Show: Hamilton - Limited engagement performances", + "🏀 Knicks vs Lakers at Madison Square Garden", + "🎨 Modern Art Exhibit at MoMA - New installations", + "🍕 Pizza Walking Tour of Brooklyn - Artisan pizzerias", + ], + ["London"] = [ + "👑 Royal Collection Exhibition at Buckingham Palace", + "🎭 West End Musical: The Phantom of the Opera", + "🍺 Craft Beer Festival at Brick Lane", + "🎪 Winter Wonderland at Hyde Park - Rides and markets", + ], + ["Sydney"] = [ + "🏄 Pro Surfing Competition at Bondi Beach", + "🎵 Opera at Sydney Opera House - La Bohème", + "🦘 Wildlife Night Safari at Taronga Zoo", + "🍽️ Harbor Dinner Cruise with fireworks", + ], + ["Rome"] = [ + "🏛️ After-Hours Vatican Tour - Skip the crowds", + "🍝 Pasta Making Class in Trastevere", + "🎵 Classical Concert at Borghese Gallery", + "🍷 Wine Tasting in Roman Cellars", + ], + }; + + // Find events for the destination or use generic events + string[] events = [ + "🎭 Local theater performance", + "🍽️ Food and wine festival", + "🎨 Art gallery opening", + "🎵 Live music at local venues", + ]; + + foreach (KeyValuePair entry in eventsByCity) + { + if (destination.Contains(entry.Key, StringComparison.OrdinalIgnoreCase)) + { + events = entry.Value; + break; + } + } + + string eventList = string.Join("\n• ", events); + return $""" + Local events in {destination} around {date}: + + • {eventList} + + 💡 Tip: Book popular events in advance as they may sell out quickly! + """; + } + + private static string GetWeatherRecommendation(string condition) + { + // Use case-insensitive comparison instead of ToLowerInvariant() to satisfy CA1308 + return condition switch + { + string c when c.Contains("rain", StringComparison.OrdinalIgnoreCase) || c.Contains("drizzle", StringComparison.OrdinalIgnoreCase) => + "Bring an umbrella and waterproof jacket. Consider indoor activities for backup.", + string c when c.Contains("fog", StringComparison.OrdinalIgnoreCase) => + "Morning visibility may be limited. Plan outdoor sightseeing for afternoon.", + string c when c.Contains("cold", StringComparison.OrdinalIgnoreCase) => + "Layer up with warm clothing. Hot drinks and cozy cafés recommended.", + string c when c.Contains("hot", StringComparison.OrdinalIgnoreCase) || c.Contains("warm", StringComparison.OrdinalIgnoreCase) => + "Stay hydrated and use sunscreen. Plan strenuous activities for cooler morning hours.", + string c when c.Contains("thunder", StringComparison.OrdinalIgnoreCase) || c.Contains("storm", StringComparison.OrdinalIgnoreCase) => + "Keep an eye on weather updates. Have indoor alternatives ready.", + _ => "Pleasant conditions expected. Great day for outdoor exploration!" + }; + } +} diff --git a/dotnet/samples/AzureFunctions/08_ReliableStreaming/host.json b/dotnet/samples/AzureFunctions/08_ReliableStreaming/host.json new file mode 100644 index 0000000000..4247b37c97 --- /dev/null +++ b/dotnet/samples/AzureFunctions/08_ReliableStreaming/host.json @@ -0,0 +1,21 @@ +{ + "version": "2.0", + "logging": { + "logLevel": { + "Microsoft.Agents.AI.DurableTask": "Information", + "Microsoft.Agents.AI.Hosting.AzureFunctions": "Information", + "DurableTask": "Information", + "Microsoft.DurableTask": "Information", + "ReliableStreaming": "Information" + } + }, + "extensions": { + "durableTask": { + "hubName": "default", + "storageProvider": { + "type": "AzureManaged", + "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING" + } + } + } +} diff --git a/dotnet/samples/AzureFunctions/08_ReliableStreaming/local.settings.json b/dotnet/samples/AzureFunctions/08_ReliableStreaming/local.settings.json new file mode 100644 index 0000000000..5dfdb17999 --- /dev/null +++ b/dotnet/samples/AzureFunctions/08_ReliableStreaming/local.settings.json @@ -0,0 +1,12 @@ +{ + "IsEncrypted": false, + "Values": { + "FUNCTIONS_WORKER_RUNTIME": "dotnet-isolated", + "AzureWebJobsStorage": "UseDevelopmentStorage=true", + "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None", + "AZURE_OPENAI_ENDPOINT": "", + "AZURE_OPENAI_DEPLOYMENT": "", + "REDIS_CONNECTION_STRING": "localhost:6379", + "REDIS_STREAM_TTL_MINUTES": "10" + } +} diff --git a/dotnet/samples/AzureFunctions/README.md b/dotnet/samples/AzureFunctions/README.md index e60b0f662e..2545712ea4 100644 --- a/dotnet/samples/AzureFunctions/README.md +++ b/dotnet/samples/AzureFunctions/README.md @@ -9,6 +9,7 @@ This directory contains samples for Azure Functions. - **[05_AgentOrchestration_HITL](05_AgentOrchestration_HITL)**: A sample that demonstrates how to implement a human-in-the-loop workflow using durable orchestration, including external event handling for human approval. - **[06_LongRunningTools](06_LongRunningTools)**: A sample that demonstrates how agents can start and interact with durable orchestrations from tool calls to enable long-running tool scenarios. - **[07_AgentAsMcpTool](07_AgentAsMcpTool)**: A sample that demonstrates how to configure durable AI agents to be accessible as Model Context Protocol (MCP) tools. +- **[08_ReliableStreaming](08_ReliableStreaming)**: A sample that demonstrates how to implement reliable streaming for durable agents using Redis Streams, enabling clients to disconnect and reconnect without losing messages. ## Running the Samples diff --git a/dotnet/tests/Microsoft.Agents.AI.Hosting.AzureFunctions.IntegrationTests/SamplesValidation.cs b/dotnet/tests/Microsoft.Agents.AI.Hosting.AzureFunctions.IntegrationTests/SamplesValidation.cs index 0ba879f024..c80cd73941 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Hosting.AzureFunctions.IntegrationTests/SamplesValidation.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Hosting.AzureFunctions.IntegrationTests/SamplesValidation.cs @@ -19,6 +19,7 @@ public sealed class SamplesValidation(ITestOutputHelper outputHelper) : IAsyncLi private const string AzureFunctionsPort = "7071"; private const string AzuritePort = "10000"; private const string DtsPort = "8080"; + private const string RedisPort = "6379"; private static readonly string s_dotnetTargetFramework = GetTargetFramework(); private static readonly HttpClient s_sharedHttpClient = new(); @@ -392,6 +393,136 @@ await this.WaitForConditionAsync( }); } + [Fact] + public async Task ReliableStreamingSampleValidationAsync() + { + string samplePath = Path.Combine(s_samplesPath, "08_ReliableStreaming"); + await this.RunSampleTestAsync(samplePath, async (logs) => + { + Uri createUri = new($"http://localhost:{AzureFunctionsPort}/api/agent/create"); + this._outputHelper.WriteLine($"Starting reliable streaming agent via POST request to {createUri}..."); + + // Test the agent endpoint with a simple prompt + const string RequestBody = "Plan a 3-day trip to Seattle. Include daily activities."; + using HttpContent content = new StringContent(RequestBody, Encoding.UTF8, "text/plain"); + using HttpRequestMessage request = new(HttpMethod.Post, createUri) + { + Content = content + }; + request.Headers.Add("Accept", "text/plain"); + + using HttpResponseMessage response = await s_sharedHttpClient.SendAsync( + request, + HttpCompletionOption.ResponseHeadersRead); + + // The response should be successful + Assert.True(response.IsSuccessStatusCode, $"Agent request failed with status: {response.StatusCode}"); + Assert.Equal("text/plain", response.Content.Headers.ContentType?.MediaType); + + // The response headers should include the conversation ID + string? conversationId = response.Headers.GetValues("x-conversation-id")?.FirstOrDefault(); + Assert.NotNull(conversationId); + Assert.NotEmpty(conversationId); + this._outputHelper.WriteLine($"Agent conversation ID: {conversationId}"); + + // Read the streamed response + using Stream responseStream = await response.Content.ReadAsStreamAsync(); + using StreamReader reader = new(responseStream); + StringBuilder responseText = new(); + char[] buffer = new char[1024]; + int bytesRead; + + // Read for a reasonable amount of time to get some content + using CancellationTokenSource readTimeout = new(TimeSpan.FromSeconds(30)); + try + { + while (!readTimeout.Token.IsCancellationRequested) + { + bytesRead = await reader.ReadAsync(buffer, 0, buffer.Length); + if (bytesRead == 0) + { + // Check if we've received enough content + if (responseText.Length > 50) + { + break; + } + await Task.Delay(100, readTimeout.Token); + continue; + } + + responseText.Append(buffer, 0, bytesRead); + if (responseText.Length > 200) + { + // We've received enough content to validate + break; + } + } + } + catch (OperationCanceledException) + { + // Timeout is acceptable if we got some content + } + + string responseContent = responseText.ToString(); + Assert.True(responseContent.Length > 0, "Expected to receive some streamed content"); + this._outputHelper.WriteLine($"Received {responseContent.Length} characters of streamed content"); + + // Test resumption by calling the stream endpoint + Uri streamUri = new($"http://localhost:{AzureFunctionsPort}/api/agent/stream/{conversationId}"); + this._outputHelper.WriteLine($"Testing stream resumption via GET request to {streamUri}..."); + + using HttpRequestMessage streamRequest = new(HttpMethod.Get, streamUri); + streamRequest.Headers.Add("Accept", "text/plain"); + + using HttpResponseMessage streamResponse = await s_sharedHttpClient.SendAsync( + streamRequest, + HttpCompletionOption.ResponseHeadersRead); + Assert.True(streamResponse.IsSuccessStatusCode, $"Stream request failed with status: {streamResponse.StatusCode}"); + Assert.Equal("text/plain", streamResponse.Content.Headers.ContentType?.MediaType); + + // Verify the conversation ID header is present + string? resumedConversationId = streamResponse.Headers.GetValues("x-conversation-id")?.FirstOrDefault(); + Assert.Equal(conversationId, resumedConversationId); + + // Read some content from the resumed stream + using Stream resumedStream = await streamResponse.Content.ReadAsStreamAsync(); + using StreamReader resumedReader = new(resumedStream); + StringBuilder resumedText = new(); + + using CancellationTokenSource resumedReadTimeout = new(TimeSpan.FromSeconds(10)); + try + { + while (!resumedReadTimeout.Token.IsCancellationRequested) + { + bytesRead = await resumedReader.ReadAsync(buffer, 0, buffer.Length); + if (bytesRead == 0) + { + if (resumedText.Length > 50) + { + break; + } + await Task.Delay(100, resumedReadTimeout.Token); + continue; + } + + resumedText.Append(buffer, 0, bytesRead); + if (resumedText.Length > 100) + { + break; + } + } + } + catch (OperationCanceledException) + { + // Timeout is acceptable if we got some content + } + + string resumedContent = resumedText.ToString(); + Assert.True(resumedContent.Length > 0, "Expected to receive some content from resumed stream"); + this._outputHelper.WriteLine($"Received {resumedContent.Length} characters from resumed stream"); + }); + } + private async Task InvokeMcpToolAsync(McpClient mcpClient, string toolName, string query) { this._outputHelper.WriteLine($"Invoking MCP tool '{toolName}'..."); @@ -482,6 +613,21 @@ await this.WaitForConditionAsync( message: "DTS emulator is running", timeout: TimeSpan.FromSeconds(30)); } + + // Start Redis if it's not already running + if (!await this.IsRedisRunningAsync()) + { + await this.StartDockerContainerAsync( + containerName: "redis", + image: "redis:latest", + ports: ["-p", "6379:6379"]); + + // Wait for Redis + await this.WaitForConditionAsync( + condition: this.IsRedisRunningAsync, + message: "Redis is running", + timeout: TimeSpan.FromSeconds(30)); + } } private async Task IsAzuriteRunningAsync() @@ -562,6 +708,49 @@ private async Task IsDtsEmulatorRunningAsync() } } + private async Task IsRedisRunningAsync() + { + this._outputHelper.WriteLine($"Checking if Redis is running at localhost:{RedisPort}..."); + + try + { + using CancellationTokenSource timeoutCts = new(TimeSpan.FromSeconds(30)); + ProcessStartInfo startInfo = new() + { + FileName = "docker", + Arguments = "exec redis redis-cli ping", + UseShellExecute = false, + RedirectStandardOutput = true, + RedirectStandardError = true, + CreateNoWindow = true + }; + + using Process process = new() { StartInfo = startInfo }; + if (!process.Start()) + { + this._outputHelper.WriteLine("Failed to start docker exec command"); + return false; + } + + string output = await process.StandardOutput.ReadToEndAsync(timeoutCts.Token); + await process.WaitForExitAsync(timeoutCts.Token); + + if (process.ExitCode == 0 && output.Contains("PONG", StringComparison.OrdinalIgnoreCase)) + { + this._outputHelper.WriteLine("Redis is running"); + return true; + } + + this._outputHelper.WriteLine($"Redis is not running. Exit code: {process.ExitCode}, Output: {output}"); + return false; + } + catch (Exception ex) + { + this._outputHelper.WriteLine($"Redis is not running: {ex.Message}"); + return false; + } + } + private async Task StartDockerContainerAsync(string containerName, string image, string[] ports) { // Stop existing container if it exists @@ -646,6 +835,7 @@ private Process StartFunctionApp(string samplePath, List logs) startInfo.EnvironmentVariables["DURABLE_TASK_SCHEDULER_CONNECTION_STRING"] = $"Endpoint=http://localhost:{DtsPort};TaskHub=default;Authentication=None"; startInfo.EnvironmentVariables["AzureWebJobsStorage"] = "UseDevelopmentStorage=true"; + startInfo.EnvironmentVariables["REDIS_CONNECTION_STRING"] = $"localhost:{RedisPort}"; Process process = new() { StartInfo = startInfo };