Files
Chris 904a5b843e Python / .NET Samples - Restructure and Improve Samples (Feature Branc… (#4092)
* Python: .NET Samples - Restructure and Improve Samples (Feature Branch) (#4091)

* Moved by agent (#4094)

* Fix readme links

* .NET Samples - Create `04-hosting` learning path step (#4098)

* Agent move

* Agent reorderd

* Remove A2A section from README 

Removed A2A section from the Getting Started README.

* Agent fixed links

* Fix broken sample links in durable-agents README (#4101)

* Initial plan

* Fix broken internal links in documentation

Co-authored-by: crickman <66376200+crickman@users.noreply.github.com>

* Revert template link changes; keep only durable-agents README fix

Co-authored-by: crickman <66376200+crickman@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: crickman <66376200+crickman@users.noreply.github.com>

* .NET Samples - Create `03-workflows` learning path step (#4102)

* Fix solution project path

* Python: Fix broken markdown links to repo resources (outside /docs) (#4105)

* Initial plan

* Fix broken markdown links to repo resources

Co-authored-by: crickman <66376200+crickman@users.noreply.github.com>

* Update README to rename .NET Workflows Samples section

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: crickman <66376200+crickman@users.noreply.github.com>

* .NET Samples - Create `02-agents` learning path step (#4107)

* .NET: Fix broken relative link in GroupChatToolApproval README (#4108)

* Initial plan

* Fix broken link in GroupChatToolApproval README

Co-authored-by: crickman <66376200+crickman@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: crickman <66376200+crickman@users.noreply.github.com>

* Update labeler configuration for workflow samples

* .NET - Reorder Agents samples to start from Step01 instead of Step04 (#4110)

* Fix solution

* Resolve new sample paths

* Move new AgentSkills and AgentWithMemory_Step04 samples

* Fix link

* Fix readme path

* fix: update stale dotnet/samples/Durable path reference in AGENTS.md

Co-authored-by: crickman <66376200+crickman@users.noreply.github.com>

* Moved new sample

* Update solution

* Resolve merge (new sample)

* Sync to new sample - FoundryAgents_Step21_BingCustomSearch

* Updated README

* .NET Samples - Configuration Naming Update (#4149)

* .NET: Restore AzureFunctions index parity with ConsoleApps under DurableAgents samples (#4221)

* Clean-up `05_host_your_agent`

* Config setting consistency

* Refine samples

* AGENTS.md

* Move new samples

* Re-order samples

* Move new project and fixup solution

* Fixup model config

* Fix up new UT project

---------

Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
2026-02-26 00:56:10 +00:00

217 lines
8.6 KiB
C#

// Copyright (c) Microsoft. All rights reserved.
using System.Runtime.CompilerServices;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.DurableTask;
using StackExchange.Redis;
namespace ReliableStreaming;
/// <summary>
/// Represents a chunk of data read from a Redis stream.
/// </summary>
/// <param name="EntryId">The Redis stream entry ID (can be used as a cursor for resumption).</param>
/// <param name="Text">The text content of the chunk, or null if this is a completion/error marker.</param>
/// <param name="IsDone">True if this chunk marks the end of the stream.</param>
/// <param name="Error">An error message if something went wrong, or null otherwise.</param>
public readonly record struct StreamChunk(string EntryId, string? Text, bool IsDone, string? Error);
/// <summary>
/// An implementation of <see cref="IAgentResponseHandler"/> that publishes agent response updates
/// to Redis Streams for reliable delivery. This enables clients to disconnect and reconnect
/// to ongoing agent responses without losing messages.
/// </summary>
/// <remarks>
/// <para>
/// Redis Streams provide a durable, append-only log that supports consumer groups and message
/// acknowledgment. This implementation uses auto-generated IDs (which are timestamp-based)
/// as sequence numbers, allowing clients to resume from any point in the stream.
/// </para>
/// <para>
/// Each agent session gets its own Redis Stream, keyed by session ID. The stream entries
/// contain text chunks extracted from <see cref="AgentResponseUpdate"/> objects.
/// </para>
/// </remarks>
public sealed class RedisStreamResponseHandler : IAgentResponseHandler
{
private const int MaxEmptyReads = 300; // 5 minutes at 1 second intervals
private const int PollIntervalMs = 1000;
private readonly IConnectionMultiplexer _redis;
private readonly TimeSpan _streamTtl;
/// <summary>
/// Initializes a new instance of the <see cref="RedisStreamResponseHandler" /> class.
/// </summary>
/// <param name="redis">The Redis connection multiplexer.</param>
/// <param name="streamTtl">The time-to-live for stream entries. Streams will expire after this duration of inactivity.</param>
public RedisStreamResponseHandler(IConnectionMultiplexer redis, TimeSpan streamTtl)
{
this._redis = redis;
this._streamTtl = streamTtl;
}
/// <inheritdoc/>
public async ValueTask OnStreamingResponseUpdateAsync(
IAsyncEnumerable<AgentResponseUpdate> messageStream,
CancellationToken cancellationToken)
{
// Get the current session ID from the DurableAgentContext
// This is set by the AgentEntity before invoking the response handler
DurableAgentContext context = DurableAgentContext.Current
?? throw new InvalidOperationException("DurableAgentContext.Current is not set. This handler must be used within a durable agent context.");
// Get conversation ID from the current session context, which is only available in the context of
// a durable agent execution.
string conversationId = context.CurrentSession.GetService<AgentSessionId>().ToString();
if (string.IsNullOrEmpty(conversationId))
{
throw new InvalidOperationException("Unable to determine conversation ID from the current session.");
}
string streamKey = GetStreamKey(conversationId);
IDatabase db = this._redis.GetDatabase();
int sequenceNumber = 0;
await foreach (AgentResponseUpdate update in messageStream.WithCancellation(cancellationToken))
{
// Extract just the text content - this avoids serialization round-trip issues
string text = update.Text;
// Only publish non-empty text chunks
if (!string.IsNullOrEmpty(text))
{
// Create the stream entry with the text and metadata
NameValueEntry[] entries =
[
new NameValueEntry("text", text),
new NameValueEntry("sequence", sequenceNumber++),
new NameValueEntry("timestamp", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()),
];
// Add to the Redis Stream with auto-generated ID (timestamp-based)
await db.StreamAddAsync(streamKey, entries);
// Refresh the TTL on each write to keep the stream alive during active streaming
await db.KeyExpireAsync(streamKey, this._streamTtl);
}
}
// Add a sentinel entry to mark the end of the stream
NameValueEntry[] endEntries =
[
new NameValueEntry("text", ""),
new NameValueEntry("sequence", sequenceNumber),
new NameValueEntry("timestamp", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()),
new NameValueEntry("done", "true"),
];
await db.StreamAddAsync(streamKey, endEntries);
// Set final TTL - the stream will be cleaned up after this duration
await db.KeyExpireAsync(streamKey, this._streamTtl);
}
/// <inheritdoc/>
public ValueTask OnAgentResponseAsync(AgentResponse message, CancellationToken cancellationToken)
{
// This handler is optimized for streaming responses.
// For non-streaming responses, we don't need to store in Redis since
// the response is returned directly to the caller.
return ValueTask.CompletedTask;
}
/// <summary>
/// Reads chunks from a Redis stream for the given session, yielding them as they become available.
/// </summary>
/// <param name="conversationId">The conversation ID to read from.</param>
/// <param name="cursor">Optional cursor to resume from. If null, reads from the beginning.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>An async enumerable of stream chunks.</returns>
public async IAsyncEnumerable<StreamChunk> ReadStreamAsync(
string conversationId,
string? cursor,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
string streamKey = GetStreamKey(conversationId);
IDatabase db = this._redis.GetDatabase();
string startId = string.IsNullOrEmpty(cursor) ? "0-0" : cursor;
int emptyReadCount = 0;
bool hasSeenData = false;
while (!cancellationToken.IsCancellationRequested)
{
StreamEntry[]? entries = null;
string? errorMessage = null;
try
{
entries = await db.StreamReadAsync(streamKey, startId, count: 100);
}
catch (Exception ex)
{
errorMessage = ex.Message;
}
if (errorMessage != null)
{
yield return new StreamChunk(startId, null, false, errorMessage);
yield break;
}
// entries is guaranteed to be non-null if errorMessage is null
if (entries!.Length == 0)
{
if (!hasSeenData)
{
emptyReadCount++;
if (emptyReadCount >= MaxEmptyReads)
{
yield return new StreamChunk(
startId,
null,
false,
$"Stream not found or timed out after {MaxEmptyReads * PollIntervalMs / 1000} seconds");
yield break;
}
}
await Task.Delay(PollIntervalMs, cancellationToken);
continue;
}
hasSeenData = true;
foreach (StreamEntry entry in entries)
{
startId = entry.Id.ToString();
string? text = entry["text"];
string? done = entry["done"];
if (done == "true")
{
yield return new StreamChunk(startId, null, true, null);
yield break;
}
if (!string.IsNullOrEmpty(text))
{
yield return new StreamChunk(startId, text, false, null);
}
}
}
// If we exited the loop due to cancellation, throw to signal the caller
cancellationToken.ThrowIfCancellationRequested();
}
/// <summary>
/// Gets the Redis Stream key for a given conversation ID.
/// </summary>
/// <param name="conversationId">The conversation ID.</param>
/// <returns>The Redis Stream key.</returns>
internal static string GetStreamKey(string conversationId) => $"agent-stream:{conversationId}";
}