mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
.NET: Implement browse/clear functionality for state (#513)
* feat: Implement browse/clear functionality for state * fix: Assertion comments in StateManagerTests
This commit is contained in:
committed by
GitHub
Unverified
parent
1134350fcf
commit
80e89a7d87
@@ -27,6 +27,73 @@ internal class StateManager
|
||||
return scope;
|
||||
}
|
||||
|
||||
private IEnumerable<UpdateKey> GetUpdatesForScopeStrict(ScopeId scopeId)
|
||||
{
|
||||
Throw.IfNull(scopeId);
|
||||
|
||||
return this._queuedUpdates.Keys.Where(key => key.IsMatchingScope(scopeId, strict: true));
|
||||
}
|
||||
|
||||
public ValueTask ClearStateAsync(string executorId, string? scopeName)
|
||||
=> this.ClearStateAsync(new ScopeId(Throw.IfNullOrEmpty(executorId), scopeName));
|
||||
|
||||
public async ValueTask ClearStateAsync(ScopeId scopeId)
|
||||
{
|
||||
Throw.IfNull(scopeId);
|
||||
|
||||
if (this._scopes.TryGetValue(scopeId, out StateScope? scope))
|
||||
{
|
||||
HashSet<string> keysToDelete = await scope.ReadKeysAsync().ConfigureAwait(false);
|
||||
|
||||
foreach (UpdateKey updateKey in this.GetUpdatesForScopeStrict(scopeId))
|
||||
{
|
||||
StateUpdate update = this._queuedUpdates[updateKey];
|
||||
if (!update.IsDelete)
|
||||
{
|
||||
this._queuedUpdates[updateKey] = StateUpdate.Delete(update.Key);
|
||||
}
|
||||
|
||||
keysToDelete.Remove(update.Key);
|
||||
}
|
||||
|
||||
foreach (string key in keysToDelete)
|
||||
{
|
||||
UpdateKey updateKey = new(scopeId, key);
|
||||
this._queuedUpdates[updateKey] = StateUpdate.Delete(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private HashSet<string> ApplyUnpublishedUpdates(ScopeId scopeId, HashSet<string> keys)
|
||||
{
|
||||
// Apply any queued updates for this scope
|
||||
foreach (UpdateKey key in this.GetUpdatesForScopeStrict(scopeId))
|
||||
{
|
||||
StateUpdate update = this._queuedUpdates[key];
|
||||
if (update.IsDelete)
|
||||
{
|
||||
keys.Remove(update.Key);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Add is idempotent on Sets
|
||||
keys.Add(update.Key);
|
||||
}
|
||||
}
|
||||
|
||||
return keys;
|
||||
}
|
||||
|
||||
public ValueTask<HashSet<string>> ReadKeysAsync(string executorId, string? scopeName = null)
|
||||
=> this.ReadKeysAsync(new ScopeId(Throw.IfNullOrEmpty(executorId), scopeName));
|
||||
|
||||
public async ValueTask<HashSet<string>> ReadKeysAsync(ScopeId scopeId)
|
||||
{
|
||||
StateScope scope = this.GetOrCreateScope(scopeId);
|
||||
HashSet<string> keys = await scope.ReadKeysAsync().ConfigureAwait(false);
|
||||
return this.ApplyUnpublishedUpdates(scopeId, keys);
|
||||
}
|
||||
|
||||
public ValueTask<T?> ReadStateAsync<T>(string executorId, string? scopeName, string key)
|
||||
=> this.ReadStateAsync<T>(new ScopeId(Throw.IfNullOrEmpty(executorId), scopeName), key);
|
||||
|
||||
@@ -91,7 +158,7 @@ internal class StateManager
|
||||
stateUpdates.Add(this._queuedUpdates[key]);
|
||||
}
|
||||
|
||||
if (updatesByScope.Count > 0 && tracer != null)
|
||||
if (tracer != null && (updatesByScope.Count > 0))
|
||||
{
|
||||
tracer.TraceStatePublished();
|
||||
}
|
||||
|
||||
@@ -23,6 +23,13 @@ internal class StateScope
|
||||
{
|
||||
}
|
||||
|
||||
public ValueTask<HashSet<string>> ReadKeysAsync()
|
||||
{
|
||||
HashSet<string> keys = new(this._stateData.Keys, this._stateData.Comparer);
|
||||
|
||||
return new(keys);
|
||||
}
|
||||
|
||||
public ValueTask<T?> ReadStateAsync<T>(string key)
|
||||
{
|
||||
Throw.IfNullOrEmpty(key);
|
||||
@@ -40,7 +47,7 @@ internal class StateScope
|
||||
|
||||
foreach (string key in updates.Keys)
|
||||
{
|
||||
if (updates[key].Count == 0)
|
||||
if (updates == null || updates[key].Count == 0)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
@@ -50,14 +57,14 @@ internal class StateScope
|
||||
throw new InvalidOperationException($"Expected exactly one update for key '{key}'.");
|
||||
}
|
||||
|
||||
StateUpdate upadte = updates[key][0];
|
||||
if (upadte.IsDelete)
|
||||
StateUpdate update = updates[key][0];
|
||||
if (update.IsDelete)
|
||||
{
|
||||
this._stateData.Remove(key);
|
||||
}
|
||||
else
|
||||
{
|
||||
this._stateData[key] = upadte.Value!;
|
||||
this._stateData[key] = update.Value!;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -29,13 +29,17 @@ internal class UpdateKey(ScopeId scopeId, string key)
|
||||
return $"{this.ScopeId}/{this.Key}";
|
||||
}
|
||||
|
||||
public bool IsMatchingScope(ScopeId scopeId, bool strict = false)
|
||||
{
|
||||
return this.ScopeId == scopeId && (!strict || this.ScopeId.ExecutorId == scopeId.ExecutorId);
|
||||
}
|
||||
|
||||
public override bool Equals(object? obj)
|
||||
{
|
||||
if (obj is UpdateKey other)
|
||||
{
|
||||
// Unlike ScopeId, UpdateKey is equal only if both the Executor and ScopeName are the same
|
||||
return this.ScopeId.ExecutorId == other.ScopeId.ExecutorId &&
|
||||
this.ScopeId.ScopeName == other.ScopeId.ScopeName &&
|
||||
return this.IsMatchingScope(other.ScopeId, strict: true) &&
|
||||
this.Key == other.Key;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
// Copyright (c) Microsoft. All rights reserved.
|
||||
|
||||
using System.Collections.Generic;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Microsoft.Agents.Workflows;
|
||||
@@ -33,10 +34,18 @@ public interface IWorkflowContext
|
||||
/// </summary>
|
||||
/// <typeparam name="T">The type of the state value.</typeparam>
|
||||
/// <param name="key">The key of the state value.</param>
|
||||
/// <param name="scopeName">The name of the scope.</param>
|
||||
/// <param name = "scopeName" > An optional name that specifies the scope to read.If null, the default scope is
|
||||
/// used.</param>
|
||||
/// <returns>A <see cref="ValueTask{T}"/> representing the asynchronous operation.</returns>
|
||||
ValueTask<T?> ReadStateAsync<T>(string key, string? scopeName = null);
|
||||
|
||||
/// <summary>
|
||||
/// Asynchronously reads all state keys within the specified scope.
|
||||
/// </summary>
|
||||
/// <param name="scopeName">An optional name that specifies the scope to read. If null, the default scope is
|
||||
/// used.</param>
|
||||
ValueTask<HashSet<string>> ReadStateKeysAsync(string? scopeName = null);
|
||||
|
||||
/// <summary>
|
||||
/// Asynchronously updates the state of a queue entry identified by the specified key and optional scope.
|
||||
/// </summary>
|
||||
@@ -48,8 +57,21 @@ public interface IWorkflowContext
|
||||
/// <param name="key">The unique identifier for the queue entry to update. Cannot be null or empty.</param>
|
||||
/// <param name="value">The value to set for the queue entry. If null, the entry's state may be cleared or reset depending on
|
||||
/// implementation.</param>
|
||||
/// <param name="scopeName">An optional name that specifies the scope within which the queue entry resides. If null, the default scope is
|
||||
/// <param name="scopeName">An optional name that specifies the scope to update. If null, the default scope is
|
||||
/// used.</param>
|
||||
/// <returns>A ValueTask that represents the asynchronous update operation.</returns>
|
||||
ValueTask QueueStateUpdateAsync<T>(string key, T? value, string? scopeName = null);
|
||||
|
||||
/// <summary>
|
||||
/// Asynchronously clears all state entries within the specified scope.
|
||||
///
|
||||
/// This semantically equivalent to retrieving all keys in the scope and deleting them one-by-one.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Subsequent reads by this executor will not find any entries in the cleared scope. Other executors will only
|
||||
/// see the cleared state starting from the next SuperStep.
|
||||
/// </remarks>
|
||||
/// <param name="scopeName">An optional name that specifies the scope to clear. If null, the default scope is used.</param>
|
||||
/// <returns>A ValueTask that represents the asynchronous clear operation.</returns>
|
||||
ValueTask QueueClearScopeAsync(string? scopeName = null);
|
||||
}
|
||||
|
||||
@@ -105,11 +105,17 @@ internal class InProcessRunnerContext<TExternalInput> : IRunnerContext
|
||||
public ValueTask AddEventAsync(WorkflowEvent workflowEvent) => RunnerContext.AddEventAsync(workflowEvent);
|
||||
public ValueTask SendMessageAsync(object message, string? targetId = null) => RunnerContext.SendMessageAsync(ExecutorId, message, targetId);
|
||||
|
||||
public ValueTask<T?> ReadStateAsync<T>(string key, string? scopeName = null)
|
||||
=> RunnerContext.StateManager.ReadStateAsync<T>(ExecutorId, scopeName, key);
|
||||
|
||||
public ValueTask<HashSet<string>> ReadStateKeysAsync(string? scopeName = null)
|
||||
=> RunnerContext.StateManager.ReadKeysAsync(ExecutorId, scopeName);
|
||||
|
||||
public ValueTask QueueStateUpdateAsync<T>(string key, T? value, string? scopeName = null)
|
||||
=> RunnerContext.StateManager.WriteStateAsync(ExecutorId, scopeName, key, value);
|
||||
|
||||
public ValueTask<T?> ReadStateAsync<T>(string key, string? scopeName = null)
|
||||
=> RunnerContext.StateManager.ReadStateAsync<T>(ExecutorId, scopeName, key);
|
||||
public ValueTask QueueClearScopeAsync(string? scopeName = null)
|
||||
=> RunnerContext.StateManager.ClearStateAsync(ExecutorId, scopeName);
|
||||
}
|
||||
|
||||
internal Task PrepareForCheckpointAsync(CancellationToken cancellation = default)
|
||||
|
||||
@@ -52,6 +52,28 @@ public class ScopeId(string executorId, string? scopeName = null)
|
||||
return false;
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public static bool operator ==(ScopeId? left, ScopeId? right)
|
||||
{
|
||||
if (left is null && right == null)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
if (right is null)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
// The inversion here is necessary because the null analysis is incapable of proving to itself
|
||||
// that left cannot be null here: If it was, either right is null, and we returned true, or right
|
||||
// is not null, and we returned false.
|
||||
return right.Equals(left);
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public static bool operator !=(ScopeId? left, ScopeId? right) => !(left == right);
|
||||
|
||||
/// <inheritdoc/>
|
||||
public override int GetHashCode()
|
||||
{
|
||||
|
||||
@@ -0,0 +1,97 @@
|
||||
// Copyright (c) Microsoft. All rights reserved.
|
||||
|
||||
using FluentAssertions;
|
||||
using Microsoft.Agents.Workflows.Execution;
|
||||
|
||||
namespace Microsoft.Agents.Workflows.UnitTests;
|
||||
|
||||
public class StateKeyObjectTests
|
||||
{
|
||||
[Fact]
|
||||
public void Test_ScopeId_Equality()
|
||||
{
|
||||
// The rules of ScopeId are simple: Private executor scopes (executorId, scopeId=null) are only equal to
|
||||
// themselves. Public ScopeIds are equal when their scopeNames are equal, regardless of executorId.
|
||||
|
||||
ScopeId privateScope1 = new("executor1", null);
|
||||
ScopeId privateScope2 = new("executor2", null);
|
||||
|
||||
Assert.NotEqual(privateScope1, privateScope2);
|
||||
Assert.Equal(privateScope1, new ScopeId("executor1", null));
|
||||
|
||||
ScopeId sharedScope1 = new("executor1", "sharedScope");
|
||||
ScopeId sharedScope2 = new("executor2", "sharedScope");
|
||||
|
||||
Assert.Equal(sharedScope1, sharedScope2);
|
||||
Assert.NotEqual(sharedScope1, new ScopeId("executor1", "differentScope"));
|
||||
Assert.NotEqual(sharedScope1, privateScope1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Test_UpdateKey_Equality()
|
||||
{
|
||||
// The rules of UpdateKey are different from ScopeId. In the case of "shared scope",
|
||||
// two update keys with different ExecutorIds are not the same.
|
||||
|
||||
const string Key1 = "key1";
|
||||
const string Key2 = "key2";
|
||||
UpdateKey privateScope1Key = new("executor1", null, Key1);
|
||||
UpdateKey privateScope1Key2 = new("executor1", null, Key2);
|
||||
|
||||
Assert.NotEqual(privateScope1Key, privateScope1Key2);
|
||||
|
||||
UpdateKey privateScope2Key = new("executor2", null, Key1);
|
||||
|
||||
Assert.NotEqual(privateScope1Key, privateScope2Key);
|
||||
|
||||
UpdateKey scope1Executor1Key = new("executor1", "sharedScope", Key1);
|
||||
UpdateKey scope1Executor2Key = new("executor2", "sharedScope", Key1);
|
||||
|
||||
Assert.NotEqual(scope1Executor1Key, scope1Executor2Key);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Test_UpdateKey_IsMatchingScope()
|
||||
{
|
||||
const string Key1 = "key1";
|
||||
|
||||
UpdateKey privateScope1Key = new("executor1", null, Key1);
|
||||
UpdateKey privateScope2Key = new("executor2", null, Key1);
|
||||
|
||||
ScopeId privateScope1 = new("executor1", null);
|
||||
ScopeId privateScope2 = new("executor2", null);
|
||||
|
||||
ValidateMatch(privateScope1Key, privateScope1, expectedStrict: true, expectedLoose: true);
|
||||
ValidateMatch(privateScope1Key, privateScope2, expectedStrict: false, expectedLoose: false);
|
||||
ValidateMatch(privateScope2Key, privateScope1, expectedStrict: false, expectedLoose: false);
|
||||
ValidateMatch(privateScope2Key, privateScope2, expectedStrict: true, expectedLoose: true);
|
||||
|
||||
UpdateKey sharedScope1Key = new("executor1", "sharedScope", Key1);
|
||||
UpdateKey sharedScope2Key = new("executor2", "sharedScope", Key1);
|
||||
|
||||
ScopeId sharedScope1 = new("executor1", "sharedScope");
|
||||
ScopeId sharedScope2 = new("executor2", "sharedScope");
|
||||
|
||||
ValidateMatch(sharedScope1Key, sharedScope1, expectedStrict: true, expectedLoose: true);
|
||||
ValidateMatch(sharedScope1Key, sharedScope2, expectedStrict: false, expectedLoose: true);
|
||||
ValidateMatch(sharedScope2Key, sharedScope1, expectedStrict: false, expectedLoose: true);
|
||||
ValidateMatch(sharedScope2Key, sharedScope2, expectedStrict: true, expectedLoose: true);
|
||||
|
||||
// Cross checks between private and shared scopes should never match
|
||||
ValidateMatch(privateScope1Key, sharedScope1, expectedStrict: false, expectedLoose: false);
|
||||
ValidateMatch(privateScope1Key, sharedScope2, expectedStrict: false, expectedLoose: false);
|
||||
ValidateMatch(privateScope2Key, sharedScope1, expectedStrict: false, expectedLoose: false);
|
||||
ValidateMatch(privateScope2Key, sharedScope2, expectedStrict: false, expectedLoose: false);
|
||||
|
||||
ValidateMatch(sharedScope1Key, privateScope1, expectedStrict: false, expectedLoose: false);
|
||||
ValidateMatch(sharedScope1Key, privateScope2, expectedStrict: false, expectedLoose: false);
|
||||
ValidateMatch(sharedScope2Key, privateScope1, expectedStrict: false, expectedLoose: false);
|
||||
ValidateMatch(sharedScope2Key, privateScope2, expectedStrict: false, expectedLoose: false);
|
||||
|
||||
void ValidateMatch(UpdateKey key, ScopeId scope, bool expectedStrict, bool expectedLoose)
|
||||
{
|
||||
key.IsMatchingScope(scope, strict: true).Should().Be(expectedStrict);
|
||||
key.IsMatchingScope(scope, strict: false).Should().Be(expectedLoose);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,454 @@
|
||||
// Copyright (c) Microsoft. All rights reserved.
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading.Tasks;
|
||||
using FluentAssertions;
|
||||
using Microsoft.Agents.Workflows.Execution;
|
||||
|
||||
namespace Microsoft.Agents.Workflows.UnitTests;
|
||||
|
||||
public class StateManagerTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task Test_SharedScope_ReadKeysAsync()
|
||||
{
|
||||
const string? ScopeName = "sharedScope";
|
||||
await RunScopeKeysTestAsync(ScopeName, isSharedScope: true);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Test_PrivateScope_ReadKeysAsync()
|
||||
{
|
||||
const string? ScopeName = null;
|
||||
await RunScopeKeysTestAsync(ScopeName, isSharedScope: false);
|
||||
}
|
||||
|
||||
private static async Task RunScopeKeysTestAsync(string? scopeName, bool isSharedScope)
|
||||
{
|
||||
const string SelfExecutorId = "executor1";
|
||||
const string OtherExecutorId = "executor2";
|
||||
const string Key1 = "key1";
|
||||
HashSet<string> ExpectedAfterWrite = [Key1];
|
||||
|
||||
StateManager manager = new();
|
||||
ScopeId sharedScopeSelfView = new(SelfExecutorId, scopeName);
|
||||
ScopeId sharedScopeOtherView = new(OtherExecutorId, scopeName);
|
||||
|
||||
// Assert baseline: neither executor sees any keys
|
||||
HashSet<string> selfKeys = await manager.ReadKeysAsync(sharedScopeSelfView);
|
||||
selfKeys.Should().BeEmpty("there should be no keys in an empty StateManager");
|
||||
|
||||
HashSet<string> otherKeys = await manager.ReadKeysAsync(sharedScopeOtherView);
|
||||
otherKeys.Should().BeEmpty("there should be no keys in an empty StateManager");
|
||||
|
||||
// Act 1: Write a key from the self executor's view of the shared scope
|
||||
|
||||
await manager.WriteStateAsync(sharedScopeSelfView, Key1, "value1");
|
||||
|
||||
// Assert 1: The self executor should see the key immediately, but the other executor should not
|
||||
selfKeys = await manager.ReadKeysAsync(sharedScopeSelfView);
|
||||
selfKeys.SetEquals(ExpectedAfterWrite).Should().BeTrue("writes should be visible immediately to the writing executor");
|
||||
|
||||
otherKeys = await manager.ReadKeysAsync(sharedScopeOtherView);
|
||||
otherKeys.Should().BeEmpty(isSharedScope ? "writes should not be visible to other executors until published"
|
||||
: "writes to private scopes should not be visible across executors");
|
||||
|
||||
// Act 2: Publish the updates
|
||||
await manager.PublishUpdatesAsync(tracer: null);
|
||||
|
||||
// Assert 2: Both executors should see the key now, if sharedScope
|
||||
selfKeys = await manager.ReadKeysAsync(sharedScopeSelfView);
|
||||
selfKeys.SetEquals(ExpectedAfterWrite).Should().BeTrue("published writes should be visible to all executors");
|
||||
|
||||
otherKeys = await manager.ReadKeysAsync(sharedScopeOtherView);
|
||||
|
||||
if (isSharedScope)
|
||||
{
|
||||
otherKeys.SetEquals(ExpectedAfterWrite).Should().BeTrue("published writes should be visible to all executors");
|
||||
}
|
||||
else
|
||||
{
|
||||
otherKeys.Should().BeEmpty("writes to private scopes should not be visible across executors");
|
||||
}
|
||||
|
||||
// Act 3: Clear the state from the self executor's view of the shared scope
|
||||
await manager.WriteStateAsync<string?>(sharedScopeSelfView, Key1, null);
|
||||
|
||||
// Assert 3: The self executor should not see the key immediately, but the other executor should still see it if sharedScope
|
||||
selfKeys = await manager.ReadKeysAsync(sharedScopeSelfView);
|
||||
selfKeys.Should().BeEmpty("deletes should be visible immediately to the writing executor");
|
||||
|
||||
otherKeys = await manager.ReadKeysAsync(sharedScopeOtherView);
|
||||
if (isSharedScope)
|
||||
{
|
||||
otherKeys.SetEquals(ExpectedAfterWrite).Should().BeTrue("published writes should be visible to all executors");
|
||||
}
|
||||
else
|
||||
{
|
||||
otherKeys.Should().BeEmpty("writes to private scopes should not be visible across executors");
|
||||
}
|
||||
|
||||
// Act 4: Publish the updates
|
||||
await manager.PublishUpdatesAsync(tracer: null);
|
||||
|
||||
// Assert 4: Neither executor should see the key now
|
||||
selfKeys = await manager.ReadKeysAsync(sharedScopeSelfView);
|
||||
selfKeys.Should().BeEmpty("published deletes should be visible to all executors");
|
||||
|
||||
otherKeys = await manager.ReadKeysAsync(sharedScopeOtherView);
|
||||
otherKeys.Should().BeEmpty(isSharedScope ? "published deletes should be visible to all executors"
|
||||
: "writes to private scopes should not be visible across executors");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Test_SharedScope_ValueLifecycleAsync()
|
||||
{
|
||||
const string? ScopeName = "sharedScope";
|
||||
await RunValueLifecycleTestAsync(ScopeName, isSharedScope: true);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Test_PrivateScope_ValueLifecycleAsync()
|
||||
{
|
||||
const string? ScopeName = null;
|
||||
await RunValueLifecycleTestAsync(ScopeName, isSharedScope: false);
|
||||
}
|
||||
|
||||
private static async Task RunValueLifecycleTestAsync(string? scopeName, bool isSharedScope)
|
||||
{
|
||||
const string SelfExecutorId = "executor1";
|
||||
const string OtherExecutorId = "executor2";
|
||||
const string Key1 = "key1", Key2 = "key2";
|
||||
const string Value1 = "value1", Value2 = "value2";
|
||||
|
||||
StateManager manager = new();
|
||||
ScopeId scopeSelfView = new(SelfExecutorId, scopeName);
|
||||
ScopeId scopeOtherView = new(OtherExecutorId, scopeName);
|
||||
|
||||
isSharedScope.Should().Be(scopeSelfView == scopeOtherView);
|
||||
|
||||
// Assert baseline: neither executor sees any keys or values
|
||||
string? selfValue1 = await manager.ReadStateAsync<string>(scopeSelfView, Key1);
|
||||
string? selfValue2 = await manager.ReadStateAsync<string>(scopeSelfView, Key2);
|
||||
selfValue1.Should().BeNull("there should be no values in an empty StateManager");
|
||||
selfValue2.Should().BeNull("there should be no values in an empty StateManager");
|
||||
|
||||
string? otherValue1 = await manager.ReadStateAsync<string>(scopeOtherView, Key1);
|
||||
string? otherValue2 = await manager.ReadStateAsync<string>(scopeOtherView, Key2);
|
||||
otherValue1.Should().BeNull("there should be no values in an empty StateManager");
|
||||
otherValue2.Should().BeNull("there should be no values in an empty StateManager");
|
||||
|
||||
// Act 1: Write a value from the self executor's view of the shared scope
|
||||
await manager.WriteStateAsync(scopeSelfView, Key1, Value1);
|
||||
|
||||
// Assert 1: The self executor should see the value immediately, but the other executor should not
|
||||
selfValue1 = await manager.ReadStateAsync<string>(scopeSelfView, Key1);
|
||||
selfValue1.Should().Be(Value1, "writes should be visible immediately to the writing executor");
|
||||
|
||||
selfValue2 = await manager.ReadStateAsync<string>(scopeSelfView, Key2);
|
||||
selfValue2.Should().BeNull("uninvolved keys' state/value should not change after a write");
|
||||
|
||||
otherValue1 = await manager.ReadStateAsync<string>(scopeOtherView, Key1);
|
||||
otherValue1.Should().BeNull(isSharedScope ? "writes should not be visible to other executors until published (key1: written by self, read by other)"
|
||||
: "writes to private scopes should not be visible across executors");
|
||||
|
||||
otherValue2 = await manager.ReadStateAsync<string>(scopeOtherView, Key2);
|
||||
otherValue2.Should().BeNull("uninvolved keys' state/value should not change after a write");
|
||||
|
||||
// Act 2: Write a value from the other executor's view of the shared scope
|
||||
await manager.WriteStateAsync(scopeOtherView, Key2, Value2);
|
||||
|
||||
// Assert 2: The other executor should see the value immediately, but the self executor should not
|
||||
selfValue1 = await manager.ReadStateAsync<string>(scopeSelfView, Key1);
|
||||
selfValue1.Should().Be(Value1, "uninvolved keys' state/value should not change after a write");
|
||||
|
||||
selfValue2 = await manager.ReadStateAsync<string>(scopeSelfView, Key2);
|
||||
selfValue2.Should().BeNull(isSharedScope ? "writes should not be visible to other executors until published (key2: written by other, read by self)"
|
||||
: "writes to private scopes should not be visible across executors");
|
||||
|
||||
otherValue1 = await manager.ReadStateAsync<string>(scopeOtherView, Key1);
|
||||
otherValue1.Should().BeNull(isSharedScope ? "writes should not be visible to other executors until published (key1: written by self, read by other)"
|
||||
: "writes to private scopes should not be visible across executors");
|
||||
|
||||
otherValue2 = await manager.ReadStateAsync<string>(scopeOtherView, Key2);
|
||||
otherValue2.Should().Be(Value2, "writes should be visible immediately to the writing executor");
|
||||
|
||||
// Act 3: Publish the updates
|
||||
await manager.PublishUpdatesAsync(tracer: null);
|
||||
|
||||
// Assert 3: Both executors should see both values now, if the scope is shared
|
||||
selfValue1 = await manager.ReadStateAsync<string>(scopeSelfView, Key1);
|
||||
selfValue1.Should().Be(Value1, "published writes should be visible to all executors (key1: written by self, read by self)");
|
||||
|
||||
selfValue2 = await manager.ReadStateAsync<string>(scopeSelfView, Key2);
|
||||
if (isSharedScope)
|
||||
{
|
||||
selfValue2.Should().Be(Value2, "published writes should be visible to all executors (key2: written by other, read by self)");
|
||||
}
|
||||
else
|
||||
{
|
||||
selfValue2.Should().BeNull("writes to private scopes should not be visible across executors");
|
||||
}
|
||||
|
||||
otherValue1 = await manager.ReadStateAsync<string>(scopeOtherView, Key1);
|
||||
if (isSharedScope)
|
||||
{
|
||||
otherValue1.Should().Be(Value1, "published writes should be visible to all executors (key1: written by self, read by other)");
|
||||
}
|
||||
else
|
||||
{
|
||||
otherValue1.Should().BeNull("writes to private scopes should not be visible across executors");
|
||||
}
|
||||
|
||||
otherValue2 = await manager.ReadStateAsync<string>(scopeOtherView, Key2);
|
||||
otherValue2.Should().Be(Value2, "published writes should be visible to all executors (key2: written by other, read by other)");
|
||||
|
||||
// Act 4: Clear the value from the self executor's view of the shared scope
|
||||
await manager.ClearStateAsync(scopeSelfView);
|
||||
|
||||
// Assert 4: The self executor should not see either value immediately, but the other executor should still see both
|
||||
selfValue1 = await manager.ReadStateAsync<string>(scopeSelfView, Key1);
|
||||
selfValue1.Should().BeNull("clears should be visible immediately to the writing executor");
|
||||
|
||||
selfValue2 = await manager.ReadStateAsync<string>(scopeSelfView, Key2);
|
||||
selfValue2.Should().BeNull(isSharedScope ? "clears should be visible immediately to the writing executor"
|
||||
: "writes to private scopes should not be visible across executors");
|
||||
|
||||
otherValue1 = await manager.ReadStateAsync<string>(scopeOtherView, Key1);
|
||||
if (isSharedScope)
|
||||
{
|
||||
otherValue1.Should().Be(Value1, "clears should not be visible to other executors until published (key2: written by self, read by other)");
|
||||
}
|
||||
else
|
||||
{
|
||||
otherValue1.Should().BeNull("writes to private scopes should not be visible across executors");
|
||||
}
|
||||
|
||||
otherValue2 = await manager.ReadStateAsync<string>(scopeOtherView, Key2);
|
||||
otherValue2.Should().Be(Value2, isSharedScope ? "clears should not be visible to other executors until published (key2: written by self, read by other)"
|
||||
: "writes to private scopes should not be visible across executors");
|
||||
|
||||
// Act 5: Publish the updates
|
||||
await manager.PublishUpdatesAsync(tracer: null);
|
||||
|
||||
// Assert 5: Neither executor should see either value now
|
||||
selfValue1 = await manager.ReadStateAsync<string>(scopeSelfView, Key1);
|
||||
selfValue1.Should().BeNull("published clears should be visible to all executors");
|
||||
|
||||
selfValue2 = await manager.ReadStateAsync<string>(scopeSelfView, Key2);
|
||||
selfValue2.Should().BeNull(isSharedScope ? "published clears should be visible to all executors"
|
||||
: "writes to private scopes should not be visible across executors");
|
||||
|
||||
otherValue1 = await manager.ReadStateAsync<string>(scopeOtherView, Key1);
|
||||
otherValue1.Should().BeNull(isSharedScope ? "published clears should be visible to all executors"
|
||||
: "writes to private scopes should not be visible across executors");
|
||||
|
||||
otherValue2 = await manager.ReadStateAsync<string>(scopeOtherView, Key2);
|
||||
if (isSharedScope)
|
||||
{
|
||||
otherValue2.Should().BeNull("published clears should be visible to all executors");
|
||||
}
|
||||
else
|
||||
{
|
||||
otherValue2.Should().Be(Value2, "writes to private scopes should not be visible across executors");
|
||||
}
|
||||
|
||||
// Restore the written state of both keys
|
||||
await manager.WriteStateAsync(scopeSelfView, Key1, Value1);
|
||||
await manager.WriteStateAsync(scopeOtherView, Key2, Value2);
|
||||
await manager.PublishUpdatesAsync(tracer: null);
|
||||
|
||||
// Act 6: Delete Key1 from the other executor's view of the shared scope
|
||||
await manager.WriteStateAsync<string?>(scopeOtherView, Key1, null);
|
||||
|
||||
// Assert 6: The other executor should not see Key1 immediately, but should still see Key2. The self executor should still see both.
|
||||
selfValue1 = await manager.ReadStateAsync<string>(scopeSelfView, Key1);
|
||||
selfValue1.Should().Be(Value1, isSharedScope ? "deletes should not be visible to other executors until published (key1: written by other, read by self)"
|
||||
: "writes to private scopes should not be visible across executors");
|
||||
|
||||
selfValue2 = await manager.ReadStateAsync<string>(scopeSelfView, Key2);
|
||||
if (isSharedScope)
|
||||
{
|
||||
selfValue2.Should().Be(Value2, "uninvolved keys' state/value should not change after a delete");
|
||||
}
|
||||
else
|
||||
{
|
||||
selfValue2.Should().BeNull("writes to private scopes should not be visible across executors");
|
||||
}
|
||||
|
||||
otherValue1 = await manager.ReadStateAsync<string>(scopeOtherView, Key1);
|
||||
otherValue1.Should().BeNull(isSharedScope ? "deletes should be visible immediately to the writing executor"
|
||||
: "writes to private scopes should not be visible across executors");
|
||||
|
||||
otherValue2 = await manager.ReadStateAsync<string>(scopeOtherView, Key2);
|
||||
otherValue2.Should().Be(Value2, "uninvolved keys' state/value should not change after a delete");
|
||||
|
||||
// Act 7: Delete Key2 from the self executor's view of the shared scope
|
||||
await manager.WriteStateAsync<string?>(scopeSelfView, Key2, null);
|
||||
|
||||
// Assert 7: The self executor should not see Key2 immediately, but should still see Key1.
|
||||
// The other executor should not see Key1, but should still see Key2.
|
||||
selfValue1 = await manager.ReadStateAsync<string>(scopeSelfView, Key1);
|
||||
selfValue1.Should().Be(Value1, isSharedScope ? "deletes should not be visible to other executors until published (key1: written by other, read by self)"
|
||||
: "writes to private scopes should not be visible across executors");
|
||||
|
||||
selfValue2 = await manager.ReadStateAsync<string>(scopeSelfView, Key2);
|
||||
selfValue2.Should().BeNull(isSharedScope ? "deletes should be visible immediately to the writing executor"
|
||||
: "writes to private scopes should not be visible across executors");
|
||||
|
||||
otherValue1 = await manager.ReadStateAsync<string>(scopeOtherView, Key1);
|
||||
otherValue1.Should().BeNull(isSharedScope ? "deletes should be visible immediately to the writing executor"
|
||||
: "writes to private scopes should not be visible across executors");
|
||||
|
||||
otherValue2 = await manager.ReadStateAsync<string>(scopeOtherView, Key2);
|
||||
otherValue2.Should().Be(Value2, isSharedScope ? "deletes should not be visible to other executors until published (key2: written by self, read by other)"
|
||||
: "writes to private scopes should not be visible across executors");
|
||||
|
||||
// Act 8: Publish the updates
|
||||
await manager.PublishUpdatesAsync(tracer: null);
|
||||
|
||||
// Assert 8: Neither executor should see either value now
|
||||
selfValue1 = await manager.ReadStateAsync<string>(scopeSelfView, Key1);
|
||||
if (isSharedScope)
|
||||
{
|
||||
selfValue1.Should().BeNull("published deletes should be visible to all executors");
|
||||
}
|
||||
else
|
||||
{
|
||||
selfValue1.Should().Be(Value1, "writes to private scopes should not be visible across executors");
|
||||
}
|
||||
|
||||
selfValue2 = await manager.ReadStateAsync<string>(scopeSelfView, Key2);
|
||||
selfValue2.Should().BeNull(isSharedScope ? "published deletes should be visible to all executors"
|
||||
: "writes to private scopes should not be visible across executors");
|
||||
|
||||
otherValue1 = await manager.ReadStateAsync<string>(scopeOtherView, Key1);
|
||||
otherValue1.Should().BeNull(isSharedScope ? "published deletes should be visible to all executors"
|
||||
: "writes to private scopes should not be visible across executors");
|
||||
|
||||
otherValue2 = await manager.ReadStateAsync<string>(scopeOtherView, Key2);
|
||||
if (isSharedScope)
|
||||
{
|
||||
otherValue2.Should().BeNull("published deletes should be visible to all executors");
|
||||
}
|
||||
else
|
||||
{
|
||||
otherValue2.Should().Be(Value2, "writes to private scopes should not be visible across executors");
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Test_SharedScope_ConflictingUpdatesAsync()
|
||||
{
|
||||
const string? ScopeName = "sharedScope";
|
||||
await RunConflictingUpdatesTest_WriteVsWriteAsync(ScopeName, isSharedScope: true);
|
||||
await RunConflictingUpdatesTest_WriteVsDeleteAsync(ScopeName, isSharedScope: true);
|
||||
await RunConflictingUpdatesTest_WriteVsClearAsync(ScopeName, isSharedScope: true);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Test_PrivateScope_ConflictingUpdatesAsync()
|
||||
{
|
||||
const string? ScopeName = null;
|
||||
await RunConflictingUpdatesTest_WriteVsWriteAsync(ScopeName, isSharedScope: false);
|
||||
await RunConflictingUpdatesTest_WriteVsDeleteAsync(ScopeName, isSharedScope: false);
|
||||
await RunConflictingUpdatesTest_WriteVsClearAsync(ScopeName, isSharedScope: false);
|
||||
}
|
||||
|
||||
private static async Task RunConflictingUpdatesTest_WriteVsWriteAsync(string? scopeName, bool isSharedScope)
|
||||
{
|
||||
const string SelfExecutorId = "executor1";
|
||||
const string OtherExecutorId = "executor2";
|
||||
const string Key1 = "key1";
|
||||
const string Value1 = "value", Value2 = "value";
|
||||
|
||||
// Arrange
|
||||
StateManager manager = new();
|
||||
ScopeId scopeSelfView = new(SelfExecutorId, scopeName);
|
||||
ScopeId scopeOtherView = new(OtherExecutorId, scopeName);
|
||||
isSharedScope.Should().Be(scopeSelfView == scopeOtherView);
|
||||
|
||||
// Act 1: Write a conflicting value from the self executor's view of the shared scope
|
||||
// Note that conflicting means update to the same key, not that the values are necessarily different.
|
||||
// We do not have any logic to resolve equivalent updates from different executors as idempotent.
|
||||
await manager.WriteStateAsync(scopeSelfView, Key1, Value1);
|
||||
await manager.WriteStateAsync(scopeOtherView, Key1, Value2);
|
||||
|
||||
Func<Task> act = async () => await manager.PublishUpdatesAsync(tracer: null);
|
||||
|
||||
if (isSharedScope)
|
||||
{
|
||||
await act.Should().ThrowAsync<InvalidOperationException>("conflicting writes to the same key should raise an exception when published");
|
||||
}
|
||||
else
|
||||
{
|
||||
await act.Should().NotThrowAsync("writes to private scopes should not be visible across executors");
|
||||
}
|
||||
}
|
||||
|
||||
private static async Task RunConflictingUpdatesTest_WriteVsDeleteAsync(string? scopeName, bool isSharedScope)
|
||||
{
|
||||
const string SelfExecutorId = "executor1";
|
||||
const string OtherExecutorId = "executor2";
|
||||
const string Key1 = "key1", Key2 = "key2";
|
||||
const string Value1 = "value", Value2 = "value";
|
||||
|
||||
// Arrange
|
||||
StateManager manager = new();
|
||||
ScopeId scopeSelfView = new(SelfExecutorId, scopeName);
|
||||
ScopeId scopeOtherView = new(OtherExecutorId, scopeName);
|
||||
isSharedScope.Should().Be(scopeSelfView == scopeOtherView);
|
||||
|
||||
await manager.WriteStateAsync(scopeSelfView, Key1, Value1);
|
||||
await manager.WriteStateAsync(scopeOtherView, Key2, Value2);
|
||||
await manager.PublishUpdatesAsync(tracer: null);
|
||||
|
||||
// Act: Update the key from one executor and delete it from another
|
||||
await manager.WriteStateAsync(scopeSelfView, Key1, "newValue");
|
||||
await manager.WriteStateAsync<string>(scopeOtherView, Key1, null);
|
||||
Func<Task> act = async () => await manager.PublishUpdatesAsync(tracer: null);
|
||||
|
||||
if (isSharedScope)
|
||||
{
|
||||
await act.Should().ThrowAsync<InvalidOperationException>("conflicting writes (update vs delete) should raise an exception when published");
|
||||
}
|
||||
else
|
||||
{
|
||||
await act.Should().NotThrowAsync("writes to private scopes should not be visible across executors");
|
||||
}
|
||||
}
|
||||
|
||||
private static async Task RunConflictingUpdatesTest_WriteVsClearAsync(string? scopeName, bool isSharedScope)
|
||||
{
|
||||
const string SelfExecutorId = "executor1";
|
||||
const string OtherExecutorId = "executor2";
|
||||
const string Key1 = "key1", Key2 = "key2";
|
||||
const string Value1 = "value", Value2 = "value";
|
||||
|
||||
// Arrange
|
||||
StateManager manager = new();
|
||||
ScopeId scopeSelfView = new(SelfExecutorId, scopeName);
|
||||
ScopeId scopeOtherView = new(OtherExecutorId, scopeName);
|
||||
isSharedScope.Should().Be(scopeSelfView == scopeOtherView);
|
||||
|
||||
await manager.WriteStateAsync(scopeSelfView, Key1, Value1);
|
||||
await manager.WriteStateAsync(scopeOtherView, Key2, Value2);
|
||||
await manager.PublishUpdatesAsync(tracer: null);
|
||||
|
||||
// Act: Update the key from one, and clear the entire scope from another
|
||||
await manager.WriteStateAsync(scopeSelfView, Key1, "newValue");
|
||||
await manager.ClearStateAsync(scopeOtherView);
|
||||
Func<Task> act = async () => await manager.PublishUpdatesAsync(tracer: null);
|
||||
|
||||
// Assert
|
||||
if (isSharedScope)
|
||||
{
|
||||
await act.Should().ThrowAsync<InvalidOperationException>("conflicting writes (update vs clear) should raise an exception when published");
|
||||
}
|
||||
else
|
||||
{
|
||||
await act.Should().NotThrowAsync("writes to private scopes should not be visible across executors");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,119 +0,0 @@
|
||||
// Copyright (c) Microsoft. All rights reserved.
|
||||
|
||||
using System;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Agents.Workflows.Execution;
|
||||
|
||||
namespace Microsoft.Agents.Workflows.UnitTests;
|
||||
|
||||
public class StateSmokeTest
|
||||
{
|
||||
[Fact]
|
||||
public void Test_ScopeId_Equality()
|
||||
{
|
||||
// The rules of ScopeId are simple: Private executor scopes (executorId, scopeId=null) are only equal to
|
||||
// themselves. Public ScopeIds are equal when their scopeNames are equal, regardless of executorId.
|
||||
|
||||
ScopeId privateScope1 = new("executor1", null);
|
||||
ScopeId privateScope2 = new("executor2", null);
|
||||
|
||||
Assert.NotEqual(privateScope1, privateScope2);
|
||||
Assert.Equal(privateScope1, new ScopeId("executor1", null));
|
||||
|
||||
ScopeId sharedScope1 = new("executor1", "sharedScope");
|
||||
ScopeId sharedScope2 = new("executor2", "sharedScope");
|
||||
|
||||
Assert.Equal(sharedScope1, sharedScope2);
|
||||
Assert.NotEqual(sharedScope1, new ScopeId("executor1", "differentScope"));
|
||||
Assert.NotEqual(sharedScope1, privateScope1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Test_UpdateKey_Equality()
|
||||
{
|
||||
// The rules of UpdateKey are different from ScopeId. In the case of "shared scope",
|
||||
// two update keys with different ExecutorIds are not the same.
|
||||
|
||||
const string Key1 = "key1";
|
||||
const string Key2 = "key2";
|
||||
UpdateKey privateScope1Key = new("executor1", null, Key1);
|
||||
UpdateKey privateScope1Key2 = new("executor1", null, Key2);
|
||||
|
||||
Assert.NotEqual(privateScope1Key, privateScope1Key2);
|
||||
|
||||
UpdateKey privateScope2Key = new("executor2", null, Key1);
|
||||
|
||||
Assert.NotEqual(privateScope1Key, privateScope2Key);
|
||||
|
||||
UpdateKey scope1Executor1Key = new("executor1", "sharedScope", Key1);
|
||||
UpdateKey scope1Executor2Key = new("executor2", "sharedScope", Key1);
|
||||
|
||||
Assert.NotEqual(scope1Executor1Key, scope1Executor2Key);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Test_ReadQueueUpdateAsync()
|
||||
{
|
||||
ScopeId sharedScope1 = new("executor1", "sharedScope");
|
||||
ScopeId sharedScope2 = new("executor2", "sharedScope");
|
||||
|
||||
StateManager manager = new();
|
||||
|
||||
// Default reads on "object" should be null
|
||||
const string Key = "key1";
|
||||
Assert.Null(await manager.ReadStateAsync<object>(sharedScope1, Key));
|
||||
Assert.Null(await manager.ReadStateAsync<object>(sharedScope1, Key));
|
||||
|
||||
await manager.WriteStateAsync(sharedScope1, Key, new object());
|
||||
|
||||
// After writing, we should be able to read the value from the executor's scope
|
||||
// but not the shared scope yet
|
||||
Assert.NotNull(await manager.ReadStateAsync<object>(sharedScope1, Key));
|
||||
Assert.Null(await manager.ReadStateAsync<object>(sharedScope2, Key));
|
||||
|
||||
// Writes to one key should not impact other keys
|
||||
Assert.Null(await manager.ReadStateAsync<object>(sharedScope1, "key2"));
|
||||
|
||||
// Publish the write
|
||||
await manager.PublishUpdatesAsync(tracer: null);
|
||||
|
||||
// Now all the executors should be able to see the new state
|
||||
Assert.NotNull(await manager.ReadStateAsync<object>(sharedScope1, Key));
|
||||
Assert.NotNull(await manager.ReadStateAsync<object>(sharedScope2, Key));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Test_ConflictingWritesRaiseExceptionAsync()
|
||||
{
|
||||
ScopeId sharedScope1 = new("executor1", "sharedScope");
|
||||
ScopeId sharedScope2 = new("executor2", "sharedScope");
|
||||
|
||||
StateManager manager = new();
|
||||
|
||||
const string Key = "key1";
|
||||
const string Value1 = "1";
|
||||
const string Value2 = "2";
|
||||
|
||||
// Write values from both executors
|
||||
await manager.WriteStateAsync(sharedScope1, Key, Value1);
|
||||
await manager.WriteStateAsync(sharedScope2, Key, Value2);
|
||||
|
||||
// Check that reading each will result in the right value
|
||||
Assert.Equal(Value1, await manager.ReadStateAsync<string>(sharedScope1, Key));
|
||||
Assert.Equal(Value2, await manager.ReadStateAsync<string>(sharedScope2, Key));
|
||||
|
||||
// Try to publish the updates
|
||||
try
|
||||
{
|
||||
await manager.PublishUpdatesAsync(tracer: null);
|
||||
Assert.Fail("Expected InvalidOperationException due to conflicting writes.");
|
||||
}
|
||||
catch (InvalidOperationException)
|
||||
{
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Assert.Fail($"Expected InvalidOperationException, but got {ex.GetType().Name}.");
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user