.NET: [Durable Agents] Reliable streaming sample (#2942)

* .NET: [Durable Agents] Reliable streaming sample

* Add automated validation for new sample

* Address Copilot PR feedback
This commit is contained in:
Chris Gillum
2025-12-19 15:43:36 -08:00
committed by GitHub
Unverified
parent 3b77192ad0
commit 0e152a0e33
13 changed files with 1344 additions and 0 deletions
@@ -28,6 +28,18 @@ runs:
echo "Waiting for Azurite (Azure Storage emulator) to be ready"
timeout 30 bash -c 'until curl --silent http://localhost:10000/devstoreaccount1; do sleep 1; done'
echo "Azurite (Azure Storage emulator) is ready"
- name: Start Redis
shell: bash
run: |
if [ "$(docker ps -aq -f name=redis)" ]; then
echo "Stopping and removing existing Redis"
docker rm -f redis
fi
echo "Starting Redis"
docker run -d --name redis -p 6379:6379 redis:latest
echo "Waiting for Redis to be ready"
timeout 30 bash -c 'until docker exec redis redis-cli ping | grep -q PONG; do sleep 1; done'
echo "Redis is ready"
- name: Install Azure Functions Core Tools
shell: bash
run: |
+2
View File
@@ -125,6 +125,8 @@
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore" Version="2.1.0" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.Mcp" Version="1.0.0" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Sdk" Version="2.0.7" />
<!-- Redis -->
<PackageVersion Include="StackExchange.Redis" Version="2.10.1" />
<!-- Test -->
<PackageVersion Include="FluentAssertions" Version="8.8.0" />
<PackageVersion Include="Microsoft.AspNetCore.TestHost" Condition="'$(TargetFramework)' == 'net8.0'" Version="8.0.22" />
+1
View File
@@ -33,6 +33,7 @@
<Project Path="samples/AzureFunctions/05_AgentOrchestration_HITL/05_AgentOrchestration_HITL.csproj" />
<Project Path="samples/AzureFunctions/06_LongRunningTools/06_LongRunningTools.csproj" />
<Project Path="samples/AzureFunctions/07_AgentAsMcpTool/07_AgentAsMcpTool.csproj" />
<Project Path="samples/AzureFunctions/08_ReliableStreaming/08_ReliableStreaming.csproj" />
</Folder>
<Folder Name="/Samples/GettingStarted/">
<File Path="samples/GettingStarted/README.md" />
@@ -0,0 +1,47 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>net10.0</TargetFrameworks>
<AzureFunctionsVersion>v4</AzureFunctionsVersion>
<OutputType>Exe</OutputType>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<!-- The Functions build tools don't like namespaces that start with a number -->
<AssemblyName>ReliableStreaming</AssemblyName>
<RootNamespace>ReliableStreaming</RootNamespace>
</PropertyGroup>
<ItemGroup>
<FrameworkReference Include="Microsoft.AspNetCore.App" />
</ItemGroup>
<!-- Azure Functions packages -->
<ItemGroup>
<PackageReference Include="Microsoft.Azure.Functions.Worker" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.DurableTask" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.DurableTask.AzureManaged" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Sdk" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Azure.AI.OpenAI" />
<PackageReference Include="Azure.Identity" />
</ItemGroup>
<!-- Redis for reliable streaming -->
<ItemGroup>
<PackageReference Include="StackExchange.Redis" />
</ItemGroup>
<!-- Local projects that should be switched to package references when using the sample outside of this MAF repo -->
<!--
<ItemGroup>
<PackageReference Include="Microsoft.Agents.AI.Hosting.AzureFunctions" />
<PackageReference Include="Microsoft.Agents.AI.OpenAI" />
</ItemGroup>
-->
<ItemGroup>
<ProjectReference Include="..\..\..\src\Microsoft.Agents.AI.Hosting.AzureFunctions\Microsoft.Agents.AI.Hosting.AzureFunctions.csproj" />
<ProjectReference Include="..\..\..\src\Microsoft.Agents.AI.OpenAI\Microsoft.Agents.AI.OpenAI.csproj" />
</ItemGroup>
</Project>
@@ -0,0 +1,320 @@
// Copyright (c) Microsoft. All rights reserved.
using System.Text;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.DurableTask;
using Microsoft.Agents.AI.Hosting.AzureFunctions;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.Functions.Worker;
using Microsoft.DurableTask.Client;
using Microsoft.Extensions.Logging;
namespace ReliableStreaming;
/// <summary>
/// HTTP trigger functions for reliable streaming of durable agent responses.
/// </summary>
/// <remarks>
/// This class exposes two endpoints:
/// <list type="bullet">
/// <item>
/// <term>Create</term>
/// <description>Starts an agent run and streams responses. The response format depends on the
/// <c>Accept</c> header: <c>text/plain</c> returns raw text (ideal for terminals), while
/// <c>text/event-stream</c> or any other value returns Server-Sent Events (SSE).</description>
/// </item>
/// <item>
/// <term>Stream</term>
/// <description>Resumes a stream from a cursor position, enabling reliable message delivery</description>
/// </item>
/// </list>
/// </remarks>
public sealed class FunctionTriggers
{
private readonly RedisStreamResponseHandler _streamHandler;
private readonly ILogger<FunctionTriggers> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="FunctionTriggers"/> class.
/// </summary>
/// <param name="streamHandler">The Redis stream handler for reading/writing agent responses.</param>
/// <param name="logger">The logger instance.</param>
public FunctionTriggers(RedisStreamResponseHandler streamHandler, ILogger<FunctionTriggers> logger)
{
this._streamHandler = streamHandler;
this._logger = logger;
}
/// <summary>
/// Creates a new agent session, starts an agent run with the provided prompt,
/// and streams the response back to the client.
/// </summary>
/// <remarks>
/// <para>
/// The response format depends on the <c>Accept</c> header:
/// <list type="bullet">
/// <item><c>text/plain</c>: Returns raw text output, ideal for terminal display with curl</item>
/// <item><c>text/event-stream</c> or other: Returns Server-Sent Events (SSE) with cursor support</item>
/// </list>
/// </para>
/// <para>
/// The response includes an <c>x-conversation-id</c> header containing the conversation ID.
/// For SSE responses, clients can use this conversation ID to resume the stream if disconnected
/// by calling the <see cref="StreamAsync"/> endpoint with the conversation ID and the last received cursor.
/// </para>
/// <para>
/// Each SSE event contains the following fields:
/// <list type="bullet">
/// <item><c>id</c>: The Redis stream entry ID (use as cursor for resumption)</item>
/// <item><c>event</c>: Either "message" for content or "done" for stream completion</item>
/// <item><c>data</c>: The text content of the response chunk</item>
/// </list>
/// </para>
/// </remarks>
/// <param name="request">The HTTP request containing the prompt in the body.</param>
/// <param name="durableClient">The Durable Task client for signaling agents.</param>
/// <param name="context">The function invocation context.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>A streaming response in the format specified by the Accept header.</returns>
[Function(nameof(CreateAsync))]
public async Task<IActionResult> CreateAsync(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "agent/create")] HttpRequest request,
[DurableClient] DurableTaskClient durableClient,
FunctionContext context,
CancellationToken cancellationToken)
{
// Read the prompt from the request body
string prompt = await new StreamReader(request.Body).ReadToEndAsync(cancellationToken);
if (string.IsNullOrWhiteSpace(prompt))
{
return new BadRequestObjectResult("Request body must contain a prompt.");
}
AIAgent agentProxy = durableClient.AsDurableAgentProxy(context, "TravelPlanner");
// Create a new agent thread
AgentThread thread = agentProxy.GetNewThread();
AgentThreadMetadata metadata = thread.GetService<AgentThreadMetadata>()
?? throw new InvalidOperationException("Failed to get AgentThreadMetadata from new thread.");
this._logger.LogInformation("Creating new agent session: {ConversationId}", metadata.ConversationId);
// Run the agent in the background (fire-and-forget)
DurableAgentRunOptions options = new() { IsFireAndForget = true };
await agentProxy.RunAsync(prompt, thread, options, cancellationToken);
this._logger.LogInformation("Agent run started for session: {ConversationId}", metadata.ConversationId);
// Check Accept header to determine response format
// text/plain = raw text output (ideal for terminals)
// text/event-stream or other = SSE format (supports resumption)
string? acceptHeader = request.Headers.Accept.FirstOrDefault();
bool useSseFormat = acceptHeader?.Contains("text/plain", StringComparison.OrdinalIgnoreCase) != true;
return await this.StreamToClientAsync(
conversationId: metadata.ConversationId!, cursor: null, useSseFormat, request.HttpContext, cancellationToken);
}
/// <summary>
/// Resumes streaming from a specific cursor position for an existing session.
/// </summary>
/// <remarks>
/// <para>
/// Use this endpoint to resume a stream after disconnection. Pass the conversation ID
/// (from the <c>x-conversation-id</c> response header) and the last received cursor
/// (Redis stream entry ID) to continue from where you left off.
/// </para>
/// <para>
/// If no cursor is provided, streaming starts from the beginning of the stream.
/// This allows clients to replay the entire response if needed.
/// </para>
/// <para>
/// The response format depends on the <c>Accept</c> header:
/// <list type="bullet">
/// <item><c>text/plain</c>: Returns raw text output, ideal for terminal display with curl</item>
/// <item><c>text/event-stream</c> or other: Returns Server-Sent Events (SSE) with cursor support</item>
/// </list>
/// </para>
/// </remarks>
/// <param name="request">The HTTP request. Use the <c>cursor</c> query parameter to specify the cursor position.</param>
/// <param name="conversationId">The conversation ID to stream from.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>A streaming response in the format specified by the Accept header.</returns>
[Function(nameof(StreamAsync))]
public async Task<IActionResult> StreamAsync(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "agent/stream/{conversationId}")] HttpRequest request,
string conversationId,
CancellationToken cancellationToken)
{
if (string.IsNullOrWhiteSpace(conversationId))
{
return new BadRequestObjectResult("Conversation ID is required.");
}
// Get the cursor from query string (optional)
string? cursor = request.Query["cursor"].FirstOrDefault();
this._logger.LogInformation(
"Resuming stream for conversation {ConversationId} from cursor: {Cursor}",
conversationId,
cursor ?? "(beginning)");
// Check Accept header to determine response format
// text/plain = raw text output (ideal for terminals)
// text/event-stream or other = SSE format (supports cursor-based resumption)
string? acceptHeader = request.Headers.Accept.FirstOrDefault();
bool useSseFormat = acceptHeader?.Contains("text/plain", StringComparison.OrdinalIgnoreCase) != true;
return await this.StreamToClientAsync(conversationId, cursor, useSseFormat, request.HttpContext, cancellationToken);
}
/// <summary>
/// Streams chunks from the Redis stream to the HTTP response.
/// </summary>
/// <param name="conversationId">The conversation ID to stream from.</param>
/// <param name="cursor">Optional cursor to resume from. If null, streams from the beginning.</param>
/// <param name="useSseFormat">True to use SSE format, false for plain text.</param>
/// <param name="httpContext">The HTTP context for writing the response.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>An empty result after streaming completes.</returns>
private async Task<IActionResult> StreamToClientAsync(
string conversationId,
string? cursor,
bool useSseFormat,
HttpContext httpContext,
CancellationToken cancellationToken)
{
// Set response headers based on format
httpContext.Response.Headers.ContentType = useSseFormat
? "text/event-stream"
: "text/plain; charset=utf-8";
httpContext.Response.Headers.CacheControl = "no-cache";
httpContext.Response.Headers.Connection = "keep-alive";
httpContext.Response.Headers["x-conversation-id"] = conversationId;
// Disable response buffering if supported
httpContext.Features.Get<IHttpResponseBodyFeature>()?.DisableBuffering();
try
{
await foreach (StreamChunk chunk in this._streamHandler.ReadStreamAsync(
conversationId,
cursor,
cancellationToken))
{
if (chunk.Error != null)
{
this._logger.LogWarning("Stream error for conversation {ConversationId}: {Error}", conversationId, chunk.Error);
await WriteErrorAsync(httpContext.Response, chunk.Error, useSseFormat, cancellationToken);
break;
}
if (chunk.IsDone)
{
await WriteEndOfStreamAsync(httpContext.Response, chunk.EntryId, useSseFormat, cancellationToken);
break;
}
if (chunk.Text != null)
{
await WriteChunkAsync(httpContext.Response, chunk, useSseFormat, cancellationToken);
}
}
}
catch (OperationCanceledException)
{
this._logger.LogInformation("Client disconnected from stream {ConversationId}", conversationId);
}
return new EmptyResult();
}
/// <summary>
/// Writes a text chunk to the response.
/// </summary>
private static async Task WriteChunkAsync(
HttpResponse response,
StreamChunk chunk,
bool useSseFormat,
CancellationToken cancellationToken)
{
if (useSseFormat)
{
await WriteSSEEventAsync(response, "message", chunk.Text!, chunk.EntryId);
}
else
{
await response.WriteAsync(chunk.Text!, cancellationToken);
}
await response.Body.FlushAsync(cancellationToken);
}
/// <summary>
/// Writes an end-of-stream marker to the response.
/// </summary>
private static async Task WriteEndOfStreamAsync(
HttpResponse response,
string entryId,
bool useSseFormat,
CancellationToken cancellationToken)
{
if (useSseFormat)
{
await WriteSSEEventAsync(response, "done", "[DONE]", entryId);
}
else
{
await response.WriteAsync("\n", cancellationToken);
}
await response.Body.FlushAsync(cancellationToken);
}
/// <summary>
/// Writes an error message to the response.
/// </summary>
private static async Task WriteErrorAsync(
HttpResponse response,
string error,
bool useSseFormat,
CancellationToken cancellationToken)
{
if (useSseFormat)
{
await WriteSSEEventAsync(response, "error", error, null);
}
else
{
await response.WriteAsync($"\n[Error: {error}]\n", cancellationToken);
}
await response.Body.FlushAsync(cancellationToken);
}
/// <summary>
/// Writes a Server-Sent Event to the response stream.
/// </summary>
private static async Task WriteSSEEventAsync(
HttpResponse response,
string eventType,
string data,
string? id)
{
StringBuilder sb = new();
// Include the ID if provided (used as cursor for resumption)
if (!string.IsNullOrEmpty(id))
{
sb.AppendLine($"id: {id}");
}
sb.AppendLine($"event: {eventType}");
sb.AppendLine($"data: {data}");
sb.AppendLine(); // Empty line marks end of event
await response.WriteAsync(sb.ToString());
}
}
@@ -0,0 +1,100 @@
// Copyright (c) Microsoft. All rights reserved.
// This sample demonstrates how to implement reliable streaming for durable agents using Redis Streams.
// It exposes two HTTP endpoints:
// 1. Create - Starts an agent run and streams responses back via Server-Sent Events (SSE)
// 2. Stream - Resumes a stream from a specific cursor position, enabling reliable message delivery
//
// This pattern is inspired by OpenAI's background mode for the Responses API, which allows clients
// to disconnect and reconnect to ongoing agent responses without losing messages.
using Azure;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI.DurableTask;
using Microsoft.Agents.AI.Hosting.AzureFunctions;
using Microsoft.Azure.Functions.Worker.Builder;
using Microsoft.Extensions.AI;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using OpenAI.Chat;
using ReliableStreaming;
using StackExchange.Redis;
// Get the Azure OpenAI endpoint and deployment name from environment variables.
string endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT")
?? throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
string deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT")
?? throw new InvalidOperationException("AZURE_OPENAI_DEPLOYMENT is not set.");
// Get Redis connection string from environment variable.
string redisConnectionString = Environment.GetEnvironmentVariable("REDIS_CONNECTION_STRING")
?? "localhost:6379";
// Get the Redis stream TTL from environment variable (default: 10 minutes).
int redisStreamTtlMinutes = int.TryParse(
Environment.GetEnvironmentVariable("REDIS_STREAM_TTL_MINUTES"),
out int ttlMinutes) ? ttlMinutes : 10;
// Use Azure Key Credential if provided, otherwise use Azure CLI Credential.
string? azureOpenAiKey = System.Environment.GetEnvironmentVariable("AZURE_OPENAI_KEY");
AzureOpenAIClient client = !string.IsNullOrEmpty(azureOpenAiKey)
? new AzureOpenAIClient(new Uri(endpoint), new AzureKeyCredential(azureOpenAiKey))
: new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential());
// Travel Planner agent instructions - designed to produce longer responses for demonstrating streaming.
const string TravelPlannerName = "TravelPlanner";
const string TravelPlannerInstructions =
"""
You are an expert travel planner who creates detailed, personalized travel itineraries.
When asked to plan a trip, you should:
1. Create a comprehensive day-by-day itinerary
2. Include specific recommendations for activities, restaurants, and attractions
3. Provide practical tips for each destination
4. Consider weather and local events when making recommendations
5. Include estimated times and logistics between activities
Always use the available tools to get current weather forecasts and local events
for the destination to make your recommendations more relevant and timely.
Format your response with clear headings for each day and include emoji icons
to make the itinerary easy to scan and visually appealing.
""";
// Configure the function app to host the AI agent.
FunctionsApplicationBuilder builder = FunctionsApplication
.CreateBuilder(args)
.ConfigureFunctionsWebApplication()
.ConfigureDurableAgents(options =>
{
// Define the Travel Planner agent with tools for weather and events
options.AddAIAgentFactory(TravelPlannerName, sp =>
{
return client.GetChatClient(deploymentName).CreateAIAgent(
instructions: TravelPlannerInstructions,
name: TravelPlannerName,
services: sp,
tools: [
AIFunctionFactory.Create(TravelTools.GetWeatherForecast),
AIFunctionFactory.Create(TravelTools.GetLocalEvents),
]);
});
});
// Register Redis connection as a singleton
builder.Services.AddSingleton<IConnectionMultiplexer>(_ =>
ConnectionMultiplexer.Connect(redisConnectionString));
// Register the Redis stream response handler - this captures agent responses
// and publishes them to Redis Streams for reliable delivery.
// Registered as both the concrete type (for FunctionTriggers) and the interface (for the agent framework).
builder.Services.AddSingleton(sp =>
new RedisStreamResponseHandler(
sp.GetRequiredService<IConnectionMultiplexer>(),
TimeSpan.FromMinutes(redisStreamTtlMinutes)));
builder.Services.AddSingleton<IAgentResponseHandler>(sp =>
sp.GetRequiredService<RedisStreamResponseHandler>());
using IHost app = builder.Build();
app.Run();
@@ -0,0 +1,264 @@
# Reliable Streaming with Redis
This sample demonstrates how to implement reliable streaming for durable agents using Redis Streams as a message broker. It enables clients to disconnect and reconnect to ongoing agent responses without losing messages, inspired by [OpenAI's background mode](https://platform.openai.com/docs/guides/background) for the Responses API.
## Key Concepts Demonstrated
- **Reliable message delivery**: Agent responses are persisted to Redis Streams, allowing clients to resume from any point
- **Content negotiation**: Use `Accept: text/plain` for raw terminal output, or `Accept: text/event-stream` for SSE format
- **Server-Sent Events (SSE)**: Standard streaming format that works with `curl`, browsers, and most HTTP clients
- **Cursor-based resumption**: Each SSE event includes an `id` field that can be used to resume the stream
- **Fire-and-forget agent invocation**: The agent runs in the background while the client streams from Redis via an HTTP trigger function
## Environment Setup
See the [README.md](../README.md) file in the parent directory for more information on how to configure the environment, including how to install and run common sample dependencies.
### Additional Requirements: Redis
This sample requires a Redis instance. Start a local Redis instance using Docker:
```bash
docker run -d --name redis -p 6379:6379 redis:latest
```
To verify Redis is running:
```bash
docker ps | grep redis
```
## Running the Sample
Start the Azure Functions host:
```bash
func start
```
### 1. Test Streaming with curl
Open a new terminal and start a travel planning request. Use the `-i` flag to see response headers (including the conversation ID) and `Accept: text/plain` for raw text output:
**Bash (Linux/macOS/WSL):**
```bash
curl -i -N -X POST http://localhost:7071/api/agent/create \
-H "Content-Type: text/plain" \
-H "Accept: text/plain" \
-d "Plan a 7-day trip to Tokyo, Japan for next month. Include daily activities, restaurant recommendations, and tips for getting around."
```
**PowerShell:**
```powershell
curl -i -N -X POST http://localhost:7071/api/agent/create `
-H "Content-Type: text/plain" `
-H "Accept: text/plain" `
-d "Plan a 7-day trip to Tokyo, Japan for next month. Include daily activities, restaurant recommendations, and tips for getting around."
```
You'll first see the response headers, including:
```text
HTTP/1.1 200 OK
Content-Type: text/plain; charset=utf-8
x-conversation-id: @dafx-travelplanner@a1b2c3d4e5f67890abcdef1234567890
...
```
Then the agent's response will stream to your terminal in chunks, similar to a ChatGPT-style experience (though not character-by-character).
> **Note:** The `-N` flag in curl disables output buffering, which is essential for seeing the stream in real-time. The `-i` flag includes the HTTP headers in the output.
### 2. Demonstrate Stream Interruption and Resumption
This is the key feature of reliable streaming! Follow these steps to see it in action:
#### Step 1: Start a stream and note the conversation ID
Run the curl command from step 1. Watch for the `x-conversation-id` header in the response - **copy this value**, you'll need it to resume.
```text
x-conversation-id: @dafx-travelplanner@a1b2c3d4e5f67890abcdef1234567890
```
#### Step 2: Interrupt the stream
While the agent is still generating text, press **`Ctrl+C`** to interrupt the stream. The agent continues running in the background - your messages are being saved to Redis!
#### Step 3: Resume the stream
Use the conversation ID you copied to resume streaming from where you left off. Include the `Accept: text/plain` header to get raw text output:
**Bash (Linux/macOS/WSL):**
```bash
# Replace with your actual conversation ID from the x-conversation-id header
CONVERSATION_ID="@dafx-travelplanner@a1b2c3d4e5f67890abcdef1234567890"
curl -N -H "Accept: text/plain" "http://localhost:7071/api/agent/stream/${CONVERSATION_ID}"
```
**PowerShell:**
```powershell
# Replace with your actual conversation ID from the x-conversation-id header
$conversationId = "@dafx-travelplanner@a1b2c3d4e5f67890abcdef1234567890"
curl -N -H "Accept: text/plain" "http://localhost:7071/api/agent/stream/$conversationId"
```
You'll see the **entire response replayed from the beginning**, including the parts you already received before interrupting.
#### Step 4 (Advanced): Resume from a specific cursor
If you're using SSE format, each event includes an `id` field that you can use as a cursor to resume from a specific point:
```bash
# Resume from a specific cursor position
curl -N "http://localhost:7071/api/agent/stream/${CONVERSATION_ID}?cursor=1734567890123-0"
```
### 3. Alternative: SSE Format for Programmatic Clients
If you need the full Server-Sent Events format with cursors for resumable streaming, use `Accept: text/event-stream` (or omit the Accept header):
```bash
curl -i -N -X POST http://localhost:7071/api/agent/create \
-H "Content-Type: text/plain" \
-H "Accept: text/event-stream" \
-d "Plan a 7-day trip to Tokyo, Japan."
```
This returns SSE-formatted events with `id`, `event`, and `data` fields:
```text
id: 1734567890123-0
event: message
data: # 7-Day Tokyo Adventure
id: 1734567890124-0
event: message
data: ## Day 1: Arrival and Exploration
id: 1734567890999-0
event: done
data: [DONE]
```
The `id` field is the Redis stream entry ID - use it as the `cursor` parameter to resume from that exact point.
### Understanding the Response Headers
| Header | Description |
|--------|-------------|
| `x-conversation-id` | The conversation ID (session key). Use this to resume the stream. |
| `Content-Type` | Either `text/plain` or `text/event-stream` depending on your `Accept` header. |
| `Cache-Control` | Set to `no-cache` to prevent caching of the stream. |
## Architecture Overview
```text
┌─────────────┐ POST /agent/create ┌─────────────────────┐
│ Client │ (Accept: text/plain or SSE)│ Azure Functions │
│ (curl) │ ──────────────────────────► │ (FunctionTriggers) │
└─────────────┘ └──────────┬──────────┘
▲ │
│ Text or SSE stream Signal Entity
│ │
│ ▼
│ ┌─────────────────────┐
│ │ AgentEntity │
│ │ (Durable Entity) │
│ └──────────┬──────────┘
│ │
│ IAgentResponseHandler
│ │
│ ▼
│ ┌─────────────────────┐
│ │ RedisStreamResponse │
│ │ Handler │
│ └──────────┬──────────┘
│ │
│ XADD (write)
│ │
│ ▼
│ ┌─────────────────────┐
└─────────── XREAD (poll) ────────── │ Redis Streams │
│ (Durable Log) │
└─────────────────────┘
```
### Data Flow
1. **Client sends prompt**: The `Create` endpoint receives the prompt and generates a new agent thread.
2. **Agent invoked**: The durable entity (`AgentEntity`) is signaled to run the travel planner agent. This is fire-and-forget from the HTTP request's perspective.
3. **Responses captured**: As the agent generates responses, `RedisStreamResponseHandler` (implementing `IAgentResponseHandler`) extracts the text from each `AgentRunResponseUpdate` and publishes it to a Redis Stream keyed by session ID.
4. **Client polls Redis**: The HTTP response streams events by polling the Redis Stream. For SSE format, each event includes the Redis entry ID as the `id` field.
5. **Resumption**: If the client disconnects, it can call the `Stream` endpoint with the conversation ID (from the `x-conversation-id` header) and optionally the last received cursor to resume from that point.
## Message Delivery Guarantees
This sample provides **at-least-once delivery** with the following characteristics:
- **Durability**: Messages are persisted to Redis Streams with configurable TTL (default: 10 minutes).
- **Ordering**: Messages are delivered in order within a session.
- **Resumption**: Clients can resume from any point using cursor-based pagination.
- **Replay**: Clients can replay the entire stream by omitting the cursor.
### Important Considerations
- **No exactly-once delivery**: If a client disconnects exactly when receiving a message, it may receive that message again upon resumption. Clients should handle duplicate messages idempotently.
- **TTL expiration**: Streams expire after the configured TTL. Clients cannot resume streams that have expired.
- **Redis guarantees**: Redis streams are backed by Redis persistence mechanisms (RDB/AOF). Ensure your Redis instance is configured for durability as needed.
## When to Use These Patterns
The patterns demonstrated in this sample are ideal for:
- **Long-running agent tasks**: When agent responses take minutes to complete (e.g., deep research, complex planning)
- **Unreliable network connections**: Mobile apps, unstable WiFi, or connections that may drop
- **Resumable experiences**: Users should be able to close and reopen an app without losing context
- **Background processing**: When you want to fire off a task and check on it later
These patterns may be overkill for:
- **Simple, fast responses**: If responses complete in a few seconds, standard streaming is simpler
- **Stateless interactions**: If there's no need to resume or replay conversations
- **Very high throughput**: Redis adds latency; for maximum throughput, direct streaming may be better
## Configuration
| Environment Variable | Description | Default |
|---------------------|-------------|---------|
| `REDIS_CONNECTION_STRING` | Redis connection string | `localhost:6379` |
| `REDIS_STREAM_TTL_MINUTES` | How long streams are retained after last write | `10` |
| `AZURE_OPENAI_ENDPOINT` | Azure OpenAI endpoint URL | (required) |
| `AZURE_OPENAI_DEPLOYMENT` | Azure OpenAI deployment name | (required) |
| `AZURE_OPENAI_KEY` | API key (optional, uses Azure CLI auth if not set) | (optional) |
## Cleanup
To stop and remove the Redis Docker containers:
```bash
docker stop redis
docker rm redis
```
## Disclaimer
> ⚠️ **This sample is for illustration purposes only and is not intended to be production-ready.**
>
> A production implementation should consider:
>
> - Redis cluster configuration for high availability
> - Authentication and authorization for the streaming endpoints
> - Rate limiting and abuse prevention
> - Monitoring and alerting for stream health
> - Graceful handling of Redis failures
@@ -0,0 +1,213 @@
// Copyright (c) Microsoft. All rights reserved.
using System.Runtime.CompilerServices;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.DurableTask;
using StackExchange.Redis;
namespace ReliableStreaming;
/// <summary>
/// Represents a chunk of data read from a Redis stream.
/// </summary>
/// <param name="EntryId">The Redis stream entry ID (can be used as a cursor for resumption).</param>
/// <param name="Text">The text content of the chunk, or null if this is a completion/error marker.</param>
/// <param name="IsDone">True if this chunk marks the end of the stream.</param>
/// <param name="Error">An error message if something went wrong, or null otherwise.</param>
public readonly record struct StreamChunk(string EntryId, string? Text, bool IsDone, string? Error);
/// <summary>
/// An implementation of <see cref="IAgentResponseHandler"/> that publishes agent response updates
/// to Redis Streams for reliable delivery. This enables clients to disconnect and reconnect
/// to ongoing agent responses without losing messages.
/// </summary>
/// <remarks>
/// <para>
/// Redis Streams provide a durable, append-only log that supports consumer groups and message
/// acknowledgment. This implementation uses auto-generated IDs (which are timestamp-based)
/// as sequence numbers, allowing clients to resume from any point in the stream.
/// </para>
/// <para>
/// Each agent session gets its own Redis Stream, keyed by session ID. The stream entries
/// contain text chunks extracted from <see cref="AgentRunResponseUpdate"/> objects.
/// </para>
/// </remarks>
public sealed class RedisStreamResponseHandler : IAgentResponseHandler
{
private const int MaxEmptyReads = 300; // 5 minutes at 1 second intervals
private const int PollIntervalMs = 1000;
private readonly IConnectionMultiplexer _redis;
private readonly TimeSpan _streamTtl;
/// <summary>
/// Initializes a new instance of the <see cref="RedisStreamResponseHandler" /> class.
/// </summary>
/// <param name="redis">The Redis connection multiplexer.</param>
/// <param name="streamTtl">The time-to-live for stream entries. Streams will expire after this duration of inactivity.</param>
public RedisStreamResponseHandler(IConnectionMultiplexer redis, TimeSpan streamTtl)
{
this._redis = redis;
this._streamTtl = streamTtl;
}
/// <inheritdoc/>
public async ValueTask OnStreamingResponseUpdateAsync(
IAsyncEnumerable<AgentRunResponseUpdate> messageStream,
CancellationToken cancellationToken)
{
// Get the current session ID from the DurableAgentContext
// This is set by the AgentEntity before invoking the response handler
DurableAgentContext? context = DurableAgentContext.Current;
if (context is null)
{
throw new InvalidOperationException(
"DurableAgentContext.Current is not set. This handler must be used within a durable agent context.");
}
// Get conversation ID from the current thread context, which is only available in the context of
// a durable agent execution.
string conversationId = context.CurrentThread.GetService<AgentThreadMetadata>()?.ConversationId
?? throw new InvalidOperationException("Unable to determine conversation ID from the current thread.");
string streamKey = GetStreamKey(conversationId);
IDatabase db = this._redis.GetDatabase();
int sequenceNumber = 0;
await foreach (AgentRunResponseUpdate update in messageStream.WithCancellation(cancellationToken))
{
// Extract just the text content - this avoids serialization round-trip issues
string text = update.Text;
// Only publish non-empty text chunks
if (!string.IsNullOrEmpty(text))
{
// Create the stream entry with the text and metadata
NameValueEntry[] entries =
[
new NameValueEntry("text", text),
new NameValueEntry("sequence", sequenceNumber++),
new NameValueEntry("timestamp", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()),
];
// Add to the Redis Stream with auto-generated ID (timestamp-based)
await db.StreamAddAsync(streamKey, entries);
// Refresh the TTL on each write to keep the stream alive during active streaming
await db.KeyExpireAsync(streamKey, this._streamTtl);
}
}
// Add a sentinel entry to mark the end of the stream
NameValueEntry[] endEntries =
[
new NameValueEntry("text", ""),
new NameValueEntry("sequence", sequenceNumber),
new NameValueEntry("timestamp", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()),
new NameValueEntry("done", "true"),
];
await db.StreamAddAsync(streamKey, endEntries);
// Set final TTL - the stream will be cleaned up after this duration
await db.KeyExpireAsync(streamKey, this._streamTtl);
}
/// <inheritdoc/>
public ValueTask OnAgentResponseAsync(AgentRunResponse message, CancellationToken cancellationToken)
{
// This handler is optimized for streaming responses.
// For non-streaming responses, we don't need to store in Redis since
// the response is returned directly to the caller.
return ValueTask.CompletedTask;
}
/// <summary>
/// Reads chunks from a Redis stream for the given session, yielding them as they become available.
/// </summary>
/// <param name="conversationId">The conversation ID to read from.</param>
/// <param name="cursor">Optional cursor to resume from. If null, reads from the beginning.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>An async enumerable of stream chunks.</returns>
public async IAsyncEnumerable<StreamChunk> ReadStreamAsync(
string conversationId,
string? cursor,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
string streamKey = GetStreamKey(conversationId);
IDatabase db = this._redis.GetDatabase();
string startId = string.IsNullOrEmpty(cursor) ? "0-0" : cursor;
int emptyReadCount = 0;
bool hasSeenData = false;
while (!cancellationToken.IsCancellationRequested)
{
StreamEntry[]? entries = null;
string? errorMessage = null;
try
{
entries = await db.StreamReadAsync(streamKey, startId, count: 100);
}
catch (Exception ex)
{
errorMessage = ex.Message;
}
if (errorMessage != null)
{
yield return new StreamChunk(startId, null, false, errorMessage);
yield break;
}
// entries is guaranteed to be non-null if errorMessage is null
if (entries!.Length == 0)
{
if (!hasSeenData)
{
emptyReadCount++;
if (emptyReadCount >= MaxEmptyReads)
{
yield return new StreamChunk(
startId,
null,
false,
$"Stream not found or timed out after {MaxEmptyReads * PollIntervalMs / 1000} seconds");
yield break;
}
}
await Task.Delay(PollIntervalMs, cancellationToken);
continue;
}
hasSeenData = true;
foreach (StreamEntry entry in entries)
{
startId = entry.Id.ToString();
string? text = entry["text"];
string? done = entry["done"];
if (done == "true")
{
yield return new StreamChunk(startId, null, true, null);
yield break;
}
if (!string.IsNullOrEmpty(text))
{
yield return new StreamChunk(startId, text, false, null);
}
}
}
}
/// <summary>
/// Gets the Redis Stream key for a given conversation ID.
/// </summary>
/// <param name="conversationId">The conversation ID.</param>
/// <returns>The Redis Stream key.</returns>
internal static string GetStreamKey(string conversationId) => $"agent-stream:{conversationId}";
}
@@ -0,0 +1,161 @@
// Copyright (c) Microsoft. All rights reserved.
using System.ComponentModel;
namespace ReliableStreaming;
/// <summary>
/// Mock travel tools that return hardcoded data for demonstration purposes.
/// In a real application, these would call actual weather and events APIs.
/// </summary>
internal static class TravelTools
{
/// <summary>
/// Gets a weather forecast for a destination on a specific date.
/// Returns mock weather data for demonstration purposes.
/// </summary>
/// <param name="destination">The destination city or location.</param>
/// <param name="date">The date for the forecast (e.g., "2025-01-15" or "next Monday").</param>
/// <returns>A weather forecast summary.</returns>
[Description("Gets the weather forecast for a destination on a specific date. Use this to provide weather-aware recommendations in the itinerary.")]
public static string GetWeatherForecast(string destination, string date)
{
// Mock weather data based on destination for realistic responses
Dictionary<string, (string condition, int highF, int lowF)> weatherByRegion = new(StringComparer.OrdinalIgnoreCase)
{
["Tokyo"] = ("Partly cloudy with a chance of light rain", 58, 45),
["Paris"] = ("Overcast with occasional drizzle", 52, 41),
["New York"] = ("Clear and cold", 42, 28),
["London"] = ("Foggy morning, clearing in afternoon", 48, 38),
["Sydney"] = ("Sunny and warm", 82, 68),
["Rome"] = ("Sunny with light breeze", 62, 48),
["Barcelona"] = ("Partly sunny", 59, 47),
["Amsterdam"] = ("Cloudy with light rain", 46, 38),
["Dubai"] = ("Sunny and hot", 85, 72),
["Singapore"] = ("Tropical thunderstorms in afternoon", 88, 77),
["Bangkok"] = ("Hot and humid, afternoon showers", 91, 78),
["Los Angeles"] = ("Sunny and pleasant", 72, 55),
["San Francisco"] = ("Morning fog, afternoon sun", 62, 52),
["Seattle"] = ("Rainy with breaks", 48, 40),
["Miami"] = ("Warm and sunny", 78, 65),
["Honolulu"] = ("Tropical paradise weather", 82, 72),
};
// Find a matching destination or use a default
(string condition, int highF, int lowF) forecast = ("Partly cloudy", 65, 50);
foreach (KeyValuePair<string, (string, int, int)> entry in weatherByRegion)
{
if (destination.Contains(entry.Key, StringComparison.OrdinalIgnoreCase))
{
forecast = entry.Value;
break;
}
}
return $"""
Weather forecast for {destination} on {date}:
Conditions: {forecast.condition}
High: {forecast.highF}°F ({(forecast.highF - 32) * 5 / 9}°C)
Low: {forecast.lowF}°F ({(forecast.lowF - 32) * 5 / 9}°C)
Recommendation: {GetWeatherRecommendation(forecast.condition)}
""";
}
/// <summary>
/// Gets local events happening at a destination around a specific date.
/// Returns mock event data for demonstration purposes.
/// </summary>
/// <param name="destination">The destination city or location.</param>
/// <param name="date">The date to search for events (e.g., "2025-01-15" or "next week").</param>
/// <returns>A list of local events and activities.</returns>
[Description("Gets local events and activities happening at a destination around a specific date. Use this to suggest timely activities and experiences.")]
public static string GetLocalEvents(string destination, string date)
{
// Mock events data based on destination
Dictionary<string, string[]> eventsByCity = new(StringComparer.OrdinalIgnoreCase)
{
["Tokyo"] = [
"🎭 Kabuki Theater Performance at Kabukiza Theatre - Traditional Japanese drama",
"🌸 Winter Illuminations at Yoyogi Park - Spectacular light displays",
"🍜 Ramen Festival at Tokyo Station - Sample ramen from across Japan",
"🎮 Gaming Expo at Tokyo Big Sight - Latest video games and technology",
],
["Paris"] = [
"🎨 Impressionist Exhibition at Musée d'Orsay - Extended evening hours",
"🍷 Wine Tasting Tour in Le Marais - Local sommelier guided",
"🎵 Jazz Night at Le Caveau de la Huchette - Historic jazz club",
"🥐 French Pastry Workshop - Learn from master pâtissiers",
],
["New York"] = [
"🎭 Broadway Show: Hamilton - Limited engagement performances",
"🏀 Knicks vs Lakers at Madison Square Garden",
"🎨 Modern Art Exhibit at MoMA - New installations",
"🍕 Pizza Walking Tour of Brooklyn - Artisan pizzerias",
],
["London"] = [
"👑 Royal Collection Exhibition at Buckingham Palace",
"🎭 West End Musical: The Phantom of the Opera",
"🍺 Craft Beer Festival at Brick Lane",
"🎪 Winter Wonderland at Hyde Park - Rides and markets",
],
["Sydney"] = [
"🏄 Pro Surfing Competition at Bondi Beach",
"🎵 Opera at Sydney Opera House - La Bohème",
"🦘 Wildlife Night Safari at Taronga Zoo",
"🍽️ Harbor Dinner Cruise with fireworks",
],
["Rome"] = [
"🏛️ After-Hours Vatican Tour - Skip the crowds",
"🍝 Pasta Making Class in Trastevere",
"🎵 Classical Concert at Borghese Gallery",
"🍷 Wine Tasting in Roman Cellars",
],
};
// Find events for the destination or use generic events
string[] events = [
"🎭 Local theater performance",
"🍽️ Food and wine festival",
"🎨 Art gallery opening",
"🎵 Live music at local venues",
];
foreach (KeyValuePair<string, string[]> entry in eventsByCity)
{
if (destination.Contains(entry.Key, StringComparison.OrdinalIgnoreCase))
{
events = entry.Value;
break;
}
}
string eventList = string.Join("\n• ", events);
return $"""
Local events in {destination} around {date}:
• {eventList}
💡 Tip: Book popular events in advance as they may sell out quickly!
""";
}
private static string GetWeatherRecommendation(string condition)
{
// Use case-insensitive comparison instead of ToLowerInvariant() to satisfy CA1308
return condition switch
{
string c when c.Contains("rain", StringComparison.OrdinalIgnoreCase) || c.Contains("drizzle", StringComparison.OrdinalIgnoreCase) =>
"Bring an umbrella and waterproof jacket. Consider indoor activities for backup.",
string c when c.Contains("fog", StringComparison.OrdinalIgnoreCase) =>
"Morning visibility may be limited. Plan outdoor sightseeing for afternoon.",
string c when c.Contains("cold", StringComparison.OrdinalIgnoreCase) =>
"Layer up with warm clothing. Hot drinks and cozy cafés recommended.",
string c when c.Contains("hot", StringComparison.OrdinalIgnoreCase) || c.Contains("warm", StringComparison.OrdinalIgnoreCase) =>
"Stay hydrated and use sunscreen. Plan strenuous activities for cooler morning hours.",
string c when c.Contains("thunder", StringComparison.OrdinalIgnoreCase) || c.Contains("storm", StringComparison.OrdinalIgnoreCase) =>
"Keep an eye on weather updates. Have indoor alternatives ready.",
_ => "Pleasant conditions expected. Great day for outdoor exploration!"
};
}
}
@@ -0,0 +1,21 @@
{
"version": "2.0",
"logging": {
"logLevel": {
"Microsoft.Agents.AI.DurableTask": "Information",
"Microsoft.Agents.AI.Hosting.AzureFunctions": "Information",
"DurableTask": "Information",
"Microsoft.DurableTask": "Information",
"ReliableStreaming": "Information"
}
},
"extensions": {
"durableTask": {
"hubName": "default",
"storageProvider": {
"type": "AzureManaged",
"connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING"
}
}
}
}
@@ -0,0 +1,12 @@
{
"IsEncrypted": false,
"Values": {
"FUNCTIONS_WORKER_RUNTIME": "dotnet-isolated",
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None",
"AZURE_OPENAI_ENDPOINT": "<AZURE_OPENAI_ENDPOINT>",
"AZURE_OPENAI_DEPLOYMENT": "<AZURE_OPENAI_DEPLOYMENT>",
"REDIS_CONNECTION_STRING": "localhost:6379",
"REDIS_STREAM_TTL_MINUTES": "10"
}
}
+1
View File
@@ -9,6 +9,7 @@ This directory contains samples for Azure Functions.
- **[05_AgentOrchestration_HITL](05_AgentOrchestration_HITL)**: A sample that demonstrates how to implement a human-in-the-loop workflow using durable orchestration, including external event handling for human approval.
- **[06_LongRunningTools](06_LongRunningTools)**: A sample that demonstrates how agents can start and interact with durable orchestrations from tool calls to enable long-running tool scenarios.
- **[07_AgentAsMcpTool](07_AgentAsMcpTool)**: A sample that demonstrates how to configure durable AI agents to be accessible as Model Context Protocol (MCP) tools.
- **[08_ReliableStreaming](08_ReliableStreaming)**: A sample that demonstrates how to implement reliable streaming for durable agents using Redis Streams, enabling clients to disconnect and reconnect without losing messages.
## Running the Samples
@@ -19,6 +19,7 @@ public sealed class SamplesValidation(ITestOutputHelper outputHelper) : IAsyncLi
private const string AzureFunctionsPort = "7071";
private const string AzuritePort = "10000";
private const string DtsPort = "8080";
private const string RedisPort = "6379";
private static readonly string s_dotnetTargetFramework = GetTargetFramework();
private static readonly HttpClient s_sharedHttpClient = new();
@@ -392,6 +393,136 @@ public sealed class SamplesValidation(ITestOutputHelper outputHelper) : IAsyncLi
});
}
[Fact]
public async Task ReliableStreamingSampleValidationAsync()
{
string samplePath = Path.Combine(s_samplesPath, "08_ReliableStreaming");
await this.RunSampleTestAsync(samplePath, async (logs) =>
{
Uri createUri = new($"http://localhost:{AzureFunctionsPort}/api/agent/create");
this._outputHelper.WriteLine($"Starting reliable streaming agent via POST request to {createUri}...");
// Test the agent endpoint with a simple prompt
const string RequestBody = "Plan a 3-day trip to Seattle. Include daily activities.";
using HttpContent content = new StringContent(RequestBody, Encoding.UTF8, "text/plain");
using HttpRequestMessage request = new(HttpMethod.Post, createUri)
{
Content = content
};
request.Headers.Add("Accept", "text/plain");
using HttpResponseMessage response = await s_sharedHttpClient.SendAsync(
request,
HttpCompletionOption.ResponseHeadersRead);
// The response should be successful
Assert.True(response.IsSuccessStatusCode, $"Agent request failed with status: {response.StatusCode}");
Assert.Equal("text/plain", response.Content.Headers.ContentType?.MediaType);
// The response headers should include the conversation ID
string? conversationId = response.Headers.GetValues("x-conversation-id")?.FirstOrDefault();
Assert.NotNull(conversationId);
Assert.NotEmpty(conversationId);
this._outputHelper.WriteLine($"Agent conversation ID: {conversationId}");
// Read the streamed response
using Stream responseStream = await response.Content.ReadAsStreamAsync();
using StreamReader reader = new(responseStream);
StringBuilder responseText = new();
char[] buffer = new char[1024];
int bytesRead;
// Read for a reasonable amount of time to get some content
using CancellationTokenSource readTimeout = new(TimeSpan.FromSeconds(30));
try
{
while (!readTimeout.Token.IsCancellationRequested)
{
bytesRead = await reader.ReadAsync(buffer, 0, buffer.Length);
if (bytesRead == 0)
{
// Check if we've received enough content
if (responseText.Length > 50)
{
break;
}
await Task.Delay(100, readTimeout.Token);
continue;
}
responseText.Append(buffer, 0, bytesRead);
if (responseText.Length > 200)
{
// We've received enough content to validate
break;
}
}
}
catch (OperationCanceledException)
{
// Timeout is acceptable if we got some content
}
string responseContent = responseText.ToString();
Assert.True(responseContent.Length > 0, "Expected to receive some streamed content");
this._outputHelper.WriteLine($"Received {responseContent.Length} characters of streamed content");
// Test resumption by calling the stream endpoint
Uri streamUri = new($"http://localhost:{AzureFunctionsPort}/api/agent/stream/{conversationId}");
this._outputHelper.WriteLine($"Testing stream resumption via GET request to {streamUri}...");
using HttpRequestMessage streamRequest = new(HttpMethod.Get, streamUri);
streamRequest.Headers.Add("Accept", "text/plain");
using HttpResponseMessage streamResponse = await s_sharedHttpClient.SendAsync(
streamRequest,
HttpCompletionOption.ResponseHeadersRead);
Assert.True(streamResponse.IsSuccessStatusCode, $"Stream request failed with status: {streamResponse.StatusCode}");
Assert.Equal("text/plain", streamResponse.Content.Headers.ContentType?.MediaType);
// Verify the conversation ID header is present
string? resumedConversationId = streamResponse.Headers.GetValues("x-conversation-id")?.FirstOrDefault();
Assert.Equal(conversationId, resumedConversationId);
// Read some content from the resumed stream
using Stream resumedStream = await streamResponse.Content.ReadAsStreamAsync();
using StreamReader resumedReader = new(resumedStream);
StringBuilder resumedText = new();
using CancellationTokenSource resumedReadTimeout = new(TimeSpan.FromSeconds(10));
try
{
while (!resumedReadTimeout.Token.IsCancellationRequested)
{
bytesRead = await resumedReader.ReadAsync(buffer, 0, buffer.Length);
if (bytesRead == 0)
{
if (resumedText.Length > 50)
{
break;
}
await Task.Delay(100, resumedReadTimeout.Token);
continue;
}
resumedText.Append(buffer, 0, bytesRead);
if (resumedText.Length > 100)
{
break;
}
}
}
catch (OperationCanceledException)
{
// Timeout is acceptable if we got some content
}
string resumedContent = resumedText.ToString();
Assert.True(resumedContent.Length > 0, "Expected to receive some content from resumed stream");
this._outputHelper.WriteLine($"Received {resumedContent.Length} characters from resumed stream");
});
}
private async Task<string> InvokeMcpToolAsync(McpClient mcpClient, string toolName, string query)
{
this._outputHelper.WriteLine($"Invoking MCP tool '{toolName}'...");
@@ -482,6 +613,21 @@ public sealed class SamplesValidation(ITestOutputHelper outputHelper) : IAsyncLi
message: "DTS emulator is running",
timeout: TimeSpan.FromSeconds(30));
}
// Start Redis if it's not already running
if (!await this.IsRedisRunningAsync())
{
await this.StartDockerContainerAsync(
containerName: "redis",
image: "redis:latest",
ports: ["-p", "6379:6379"]);
// Wait for Redis
await this.WaitForConditionAsync(
condition: this.IsRedisRunningAsync,
message: "Redis is running",
timeout: TimeSpan.FromSeconds(30));
}
}
private async Task<bool> IsAzuriteRunningAsync()
@@ -562,6 +708,49 @@ public sealed class SamplesValidation(ITestOutputHelper outputHelper) : IAsyncLi
}
}
private async Task<bool> IsRedisRunningAsync()
{
this._outputHelper.WriteLine($"Checking if Redis is running at localhost:{RedisPort}...");
try
{
using CancellationTokenSource timeoutCts = new(TimeSpan.FromSeconds(30));
ProcessStartInfo startInfo = new()
{
FileName = "docker",
Arguments = "exec redis redis-cli ping",
UseShellExecute = false,
RedirectStandardOutput = true,
RedirectStandardError = true,
CreateNoWindow = true
};
using Process process = new() { StartInfo = startInfo };
if (!process.Start())
{
this._outputHelper.WriteLine("Failed to start docker exec command");
return false;
}
string output = await process.StandardOutput.ReadToEndAsync(timeoutCts.Token);
await process.WaitForExitAsync(timeoutCts.Token);
if (process.ExitCode == 0 && output.Contains("PONG", StringComparison.OrdinalIgnoreCase))
{
this._outputHelper.WriteLine("Redis is running");
return true;
}
this._outputHelper.WriteLine($"Redis is not running. Exit code: {process.ExitCode}, Output: {output}");
return false;
}
catch (Exception ex)
{
this._outputHelper.WriteLine($"Redis is not running: {ex.Message}");
return false;
}
}
private async Task StartDockerContainerAsync(string containerName, string image, string[] ports)
{
// Stop existing container if it exists
@@ -646,6 +835,7 @@ public sealed class SamplesValidation(ITestOutputHelper outputHelper) : IAsyncLi
startInfo.EnvironmentVariables["DURABLE_TASK_SCHEDULER_CONNECTION_STRING"] =
$"Endpoint=http://localhost:{DtsPort};TaskHub=default;Authentication=None";
startInfo.EnvironmentVariables["AzureWebJobsStorage"] = "UseDevelopmentStorage=true";
startInfo.EnvironmentVariables["REDIS_CONNECTION_STRING"] = $"localhost:{RedisPort}";
Process process = new() { StartInfo = startInfo };