mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
Simplify ClientHeadersScope, drop redundant using/Dispose (#5676)
Wesley pointed out (with a clean demo) that AsyncLocal<T> mutations made
inside an awaited async method do not leak back to the caller after the
method returns - the runtime restores the caller's view automatically.
ClientHeadersAgent.RunCoreAsync and RunCoreStreamingAsync are the only
callers of the scope, both are async methods awaited by their callers,
so the explicit using/Dispose pattern was doing work the runtime already
does for us.
* ClientHeadersScope collapsed to a single Current { get; set; } property
over an AsyncLocal<IReadOnlyDictionary<string,string>?>. Drops Push,
the Scope struct, and Dispose. XML doc explains the AsyncLocal natural-
restoration semantics so the design intent is self-documenting.
* ClientHeadersAgent uses a direct ClientHeadersScope.Current = snapshot
before delegating. Drops the local RunAsyncCoreAsync helper and the
snapshot-passed-as-parameter dance.
* Test 10 renamed to ClientHeadersScope_IsAsyncLocalIsolatedAndAutoRestoresAsync;
drops the LIFO claim, keeps the parallel-isolation assertion, and adds
a Wesley-style 'set inside async, caller sees null on return' assertion.
* Test 12 switches from using ClientHeadersScope.Push to direct
Current = ... with try/finally for test isolation.
Snapshot deep-copy in TrySnapshot stays - it defends against caller
mutating the source Dictionary mid-run, which is independent of the
AsyncLocal restoration mechanism.
This commit is contained in:
committed by
GitHub
Unverified
parent
9faf52de4f
commit
9d8c3f8cb7
@@ -42,23 +42,15 @@ internal sealed class ClientHeadersAgent : DelegatingAIAgent
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
var snapshot = TrySnapshot(options);
|
||||
if (snapshot is null)
|
||||
if (snapshot is not null)
|
||||
{
|
||||
return this.InnerAgent.RunAsync(messages, session, options, cancellationToken);
|
||||
// AsyncLocal mutations made inside an awaited async method do not leak back to the
|
||||
// caller after the method returns, so we do not need an explicit restore step here.
|
||||
// See ClientHeadersScope remarks.
|
||||
ClientHeadersScope.Current = snapshot;
|
||||
}
|
||||
|
||||
return RunAsyncCoreAsync(messages, session, options, snapshot, cancellationToken);
|
||||
|
||||
async Task<AgentResponse> RunAsyncCoreAsync(
|
||||
IEnumerable<ChatMessage> innerMessages,
|
||||
AgentSession? innerSession,
|
||||
AgentRunOptions? innerOptions,
|
||||
Dictionary<string, string> innerSnapshot,
|
||||
CancellationToken innerCt)
|
||||
{
|
||||
using var _ = ClientHeadersScope.Push(innerSnapshot);
|
||||
return await this.InnerAgent.RunAsync(innerMessages, innerSession, innerOptions, innerCt).ConfigureAwait(false);
|
||||
}
|
||||
return this.InnerAgent.RunAsync(messages, session, options, cancellationToken);
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
@@ -69,7 +61,10 @@ internal sealed class ClientHeadersAgent : DelegatingAIAgent
|
||||
[EnumeratorCancellation] CancellationToken cancellationToken = default)
|
||||
{
|
||||
var snapshot = TrySnapshot(options);
|
||||
using var _ = snapshot is null ? default : ClientHeadersScope.Push(snapshot);
|
||||
if (snapshot is not null)
|
||||
{
|
||||
ClientHeadersScope.Current = snapshot;
|
||||
}
|
||||
|
||||
await foreach (var update in this.InnerAgent.RunStreamingAsync(messages, session, options, cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
|
||||
@@ -11,39 +11,31 @@ namespace Microsoft.Agents.AI.Foundry;
|
||||
/// <see cref="ClientHeadersPolicy"/> running inside the SCM transport pipeline.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// AsyncLocal flows the value into downstream awaits but does not roll the value back when the
|
||||
/// setting method returns. This type pairs each <see cref="Push(IReadOnlyDictionary{string, string}?)"/>
|
||||
/// with a disposable that explicitly restores the prior value, giving stack-style LIFO semantics
|
||||
/// for nested or sequential per-call scopes on the same async flow.
|
||||
/// <para>
|
||||
/// <see cref="AsyncLocal{T}"/> propagates the value forward into every <c>await</c> on the same
|
||||
/// async flow, but mutations made inside an awaited <c>async</c> method do <em>not</em> leak back
|
||||
/// to the caller after the method returns. This means a method that assigns
|
||||
/// <see cref="Current"/> at the top and then awaits inner work does not need any explicit
|
||||
/// restoration step: the runtime restores the caller's view of the AsyncLocal automatically when
|
||||
/// the method's task completes.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Setting <see cref="Current"/> from synchronous code, however, will leak to the caller because
|
||||
/// no async-method boundary is crossed. All Agent Framework call sites of this carrier are
|
||||
/// inside <c>async</c> methods (<see cref="ClientHeadersAgent"/>), so the natural restoration
|
||||
/// suffices for our needs.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
internal static class ClientHeadersScope
|
||||
{
|
||||
private static readonly AsyncLocal<IReadOnlyDictionary<string, string>?> s_current = new();
|
||||
|
||||
/// <summary>Gets the dictionary captured by the most recent <see cref="Push(IReadOnlyDictionary{string, string}?)"/> on this async flow.</summary>
|
||||
public static IReadOnlyDictionary<string, string>? Current => s_current.Value;
|
||||
|
||||
/// <summary>
|
||||
/// Pushes a new value as the current scope. Disposing the returned token restores the previous value.
|
||||
/// Gets or sets the per-async-flow client-header snapshot read by <see cref="ClientHeadersPolicy"/>.
|
||||
/// </summary>
|
||||
/// <param name="headers">The header dictionary to surface to the policy. May be <see langword="null"/>.</param>
|
||||
public static Scope Push(IReadOnlyDictionary<string, string>? headers)
|
||||
public static IReadOnlyDictionary<string, string>? Current
|
||||
{
|
||||
var previous = s_current.Value;
|
||||
s_current.Value = headers;
|
||||
return new Scope(previous);
|
||||
}
|
||||
|
||||
/// <summary>Disposable token that restores the previous scope on <see cref="Dispose"/>.</summary>
|
||||
internal readonly struct Scope : System.IDisposable
|
||||
{
|
||||
private readonly IReadOnlyDictionary<string, string>? _previous;
|
||||
|
||||
internal Scope(IReadOnlyDictionary<string, string>? previous)
|
||||
{
|
||||
this._previous = previous;
|
||||
}
|
||||
|
||||
public void Dispose() => s_current.Value = this._previous;
|
||||
get => s_current.Value;
|
||||
set => s_current.Value = value;
|
||||
}
|
||||
}
|
||||
|
||||
+18
-11
@@ -245,29 +245,33 @@ public sealed class ClientHeadersExtensionsTests
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------------------------
|
||||
// 10. ClientHeadersScope.Push is LIFO and AsyncLocal-isolated (parallel runs don't leak)
|
||||
// 10. ClientHeadersScope is AsyncLocal-isolated across parallel runs and auto-restores on
|
||||
// async-method return (no explicit Dispose needed).
|
||||
// -------------------------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public async Task ClientHeadersScope_IsLifoAndAsyncLocalIsolatedAsync()
|
||||
public async Task ClientHeadersScope_IsAsyncLocalIsolatedAndAutoRestoresAsync()
|
||||
{
|
||||
// Arrange
|
||||
var dictA = new Dictionary<string, string> { ["x-client-end-user-id"] = "alice" };
|
||||
var dictB = new Dictionary<string, string> { ["x-client-end-user-id"] = "bob" };
|
||||
|
||||
// Act / Assert
|
||||
// Act / Assert: parallel async flows do not see each other's mutations.
|
||||
await Task.WhenAll(
|
||||
ProbeAsync(dictA, "alice"),
|
||||
ProbeAsync(dictB, "bob"));
|
||||
|
||||
async Task ProbeAsync(Dictionary<string, string> dict, string expected)
|
||||
{
|
||||
using (ClientHeadersScope.Push(dict))
|
||||
{
|
||||
await Task.Yield();
|
||||
Assert.Equal(expected, ClientHeadersScope.Current!["x-client-end-user-id"]);
|
||||
}
|
||||
ClientHeadersScope.Current = dict;
|
||||
await Task.Yield();
|
||||
Assert.Equal(expected, ClientHeadersScope.Current!["x-client-end-user-id"]);
|
||||
}
|
||||
|
||||
// Assert: setting Current inside an awaited async method does not leak back to the caller
|
||||
// after the method returns. This is the AsyncLocal natural-restoration behavior the
|
||||
// ClientHeadersAgent relies on.
|
||||
Assert.Null(ClientHeadersScope.Current);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------------------------
|
||||
@@ -320,16 +324,19 @@ public sealed class ClientHeadersExtensionsTests
|
||||
perTryPolicies: default,
|
||||
beforeTransportPolicies: default);
|
||||
|
||||
var perCall = new Dictionary<string, string> { ["x-client-end-user-id"] = "alice" };
|
||||
|
||||
// Act
|
||||
using (ClientHeadersScope.Push(perCall))
|
||||
ClientHeadersScope.Current = new Dictionary<string, string> { ["x-client-end-user-id"] = "alice" };
|
||||
try
|
||||
{
|
||||
var msg = pipeline.CreateMessage();
|
||||
msg.Request.Method = "GET";
|
||||
msg.Request.Uri = new Uri("https://example.test/");
|
||||
await pipeline.SendAsync(msg);
|
||||
}
|
||||
finally
|
||||
{
|
||||
ClientHeadersScope.Current = null;
|
||||
}
|
||||
|
||||
// Assert: the per-call value won.
|
||||
Assert.Equal("alice", handler.Headers["x-client-end-user-id"]);
|
||||
|
||||
Reference in New Issue
Block a user