diff --git a/dotnet/src/Microsoft.Agents.Workflows/Execution/StateManager.cs b/dotnet/src/Microsoft.Agents.Workflows/Execution/StateManager.cs index ca311a23fc..fcab3a1cd8 100644 --- a/dotnet/src/Microsoft.Agents.Workflows/Execution/StateManager.cs +++ b/dotnet/src/Microsoft.Agents.Workflows/Execution/StateManager.cs @@ -27,6 +27,73 @@ internal class StateManager return scope; } + private IEnumerable GetUpdatesForScopeStrict(ScopeId scopeId) + { + Throw.IfNull(scopeId); + + return this._queuedUpdates.Keys.Where(key => key.IsMatchingScope(scopeId, strict: true)); + } + + public ValueTask ClearStateAsync(string executorId, string? scopeName) + => this.ClearStateAsync(new ScopeId(Throw.IfNullOrEmpty(executorId), scopeName)); + + public async ValueTask ClearStateAsync(ScopeId scopeId) + { + Throw.IfNull(scopeId); + + if (this._scopes.TryGetValue(scopeId, out StateScope? scope)) + { + HashSet keysToDelete = await scope.ReadKeysAsync().ConfigureAwait(false); + + foreach (UpdateKey updateKey in this.GetUpdatesForScopeStrict(scopeId)) + { + StateUpdate update = this._queuedUpdates[updateKey]; + if (!update.IsDelete) + { + this._queuedUpdates[updateKey] = StateUpdate.Delete(update.Key); + } + + keysToDelete.Remove(update.Key); + } + + foreach (string key in keysToDelete) + { + UpdateKey updateKey = new(scopeId, key); + this._queuedUpdates[updateKey] = StateUpdate.Delete(key); + } + } + } + + private HashSet ApplyUnpublishedUpdates(ScopeId scopeId, HashSet keys) + { + // Apply any queued updates for this scope + foreach (UpdateKey key in this.GetUpdatesForScopeStrict(scopeId)) + { + StateUpdate update = this._queuedUpdates[key]; + if (update.IsDelete) + { + keys.Remove(update.Key); + } + else + { + // Add is idempotent on Sets + keys.Add(update.Key); + } + } + + return keys; + } + + public ValueTask> ReadKeysAsync(string executorId, string? scopeName = null) + => this.ReadKeysAsync(new ScopeId(Throw.IfNullOrEmpty(executorId), scopeName)); + + public async ValueTask> ReadKeysAsync(ScopeId scopeId) + { + StateScope scope = this.GetOrCreateScope(scopeId); + HashSet keys = await scope.ReadKeysAsync().ConfigureAwait(false); + return this.ApplyUnpublishedUpdates(scopeId, keys); + } + public ValueTask ReadStateAsync(string executorId, string? scopeName, string key) => this.ReadStateAsync(new ScopeId(Throw.IfNullOrEmpty(executorId), scopeName), key); @@ -91,7 +158,7 @@ internal class StateManager stateUpdates.Add(this._queuedUpdates[key]); } - if (updatesByScope.Count > 0 && tracer != null) + if (tracer != null && (updatesByScope.Count > 0)) { tracer.TraceStatePublished(); } diff --git a/dotnet/src/Microsoft.Agents.Workflows/Execution/StateScope.cs b/dotnet/src/Microsoft.Agents.Workflows/Execution/StateScope.cs index 6bd8bc18db..ce5d559337 100644 --- a/dotnet/src/Microsoft.Agents.Workflows/Execution/StateScope.cs +++ b/dotnet/src/Microsoft.Agents.Workflows/Execution/StateScope.cs @@ -23,6 +23,13 @@ internal class StateScope { } + public ValueTask> ReadKeysAsync() + { + HashSet keys = new(this._stateData.Keys, this._stateData.Comparer); + + return new(keys); + } + public ValueTask ReadStateAsync(string key) { Throw.IfNullOrEmpty(key); @@ -40,7 +47,7 @@ internal class StateScope foreach (string key in updates.Keys) { - if (updates[key].Count == 0) + if (updates == null || updates[key].Count == 0) { continue; } @@ -50,14 +57,14 @@ internal class StateScope throw new InvalidOperationException($"Expected exactly one update for key '{key}'."); } - StateUpdate upadte = updates[key][0]; - if (upadte.IsDelete) + StateUpdate update = updates[key][0]; + if (update.IsDelete) { this._stateData.Remove(key); } else { - this._stateData[key] = upadte.Value!; + this._stateData[key] = update.Value!; } } diff --git a/dotnet/src/Microsoft.Agents.Workflows/Execution/UpdateKey.cs b/dotnet/src/Microsoft.Agents.Workflows/Execution/UpdateKey.cs index 2f1252ea2a..3772d5b400 100644 --- a/dotnet/src/Microsoft.Agents.Workflows/Execution/UpdateKey.cs +++ b/dotnet/src/Microsoft.Agents.Workflows/Execution/UpdateKey.cs @@ -29,13 +29,17 @@ internal class UpdateKey(ScopeId scopeId, string key) return $"{this.ScopeId}/{this.Key}"; } + public bool IsMatchingScope(ScopeId scopeId, bool strict = false) + { + return this.ScopeId == scopeId && (!strict || this.ScopeId.ExecutorId == scopeId.ExecutorId); + } + public override bool Equals(object? obj) { if (obj is UpdateKey other) { // Unlike ScopeId, UpdateKey is equal only if both the Executor and ScopeName are the same - return this.ScopeId.ExecutorId == other.ScopeId.ExecutorId && - this.ScopeId.ScopeName == other.ScopeId.ScopeName && + return this.IsMatchingScope(other.ScopeId, strict: true) && this.Key == other.Key; } diff --git a/dotnet/src/Microsoft.Agents.Workflows/IWorkflowContext.cs b/dotnet/src/Microsoft.Agents.Workflows/IWorkflowContext.cs index b3199d0c32..c3278e41c4 100644 --- a/dotnet/src/Microsoft.Agents.Workflows/IWorkflowContext.cs +++ b/dotnet/src/Microsoft.Agents.Workflows/IWorkflowContext.cs @@ -1,5 +1,6 @@ // Copyright (c) Microsoft. All rights reserved. +using System.Collections.Generic; using System.Threading.Tasks; namespace Microsoft.Agents.Workflows; @@ -33,10 +34,18 @@ public interface IWorkflowContext /// /// The type of the state value. /// The key of the state value. - /// The name of the scope. + /// An optional name that specifies the scope to read.If null, the default scope is + /// used. /// A representing the asynchronous operation. ValueTask ReadStateAsync(string key, string? scopeName = null); + /// + /// Asynchronously reads all state keys within the specified scope. + /// + /// An optional name that specifies the scope to read. If null, the default scope is + /// used. + ValueTask> ReadStateKeysAsync(string? scopeName = null); + /// /// Asynchronously updates the state of a queue entry identified by the specified key and optional scope. /// @@ -48,8 +57,21 @@ public interface IWorkflowContext /// The unique identifier for the queue entry to update. Cannot be null or empty. /// The value to set for the queue entry. If null, the entry's state may be cleared or reset depending on /// implementation. - /// An optional name that specifies the scope within which the queue entry resides. If null, the default scope is + /// An optional name that specifies the scope to update. If null, the default scope is /// used. /// A ValueTask that represents the asynchronous update operation. ValueTask QueueStateUpdateAsync(string key, T? value, string? scopeName = null); + + /// + /// Asynchronously clears all state entries within the specified scope. + /// + /// This semantically equivalent to retrieving all keys in the scope and deleting them one-by-one. + /// + /// + /// Subsequent reads by this executor will not find any entries in the cleared scope. Other executors will only + /// see the cleared state starting from the next SuperStep. + /// + /// An optional name that specifies the scope to clear. If null, the default scope is used. + /// A ValueTask that represents the asynchronous clear operation. + ValueTask QueueClearScopeAsync(string? scopeName = null); } diff --git a/dotnet/src/Microsoft.Agents.Workflows/InProc/InProcessRunnerContext.cs b/dotnet/src/Microsoft.Agents.Workflows/InProc/InProcessRunnerContext.cs index f122f3058d..8892ac8cf0 100644 --- a/dotnet/src/Microsoft.Agents.Workflows/InProc/InProcessRunnerContext.cs +++ b/dotnet/src/Microsoft.Agents.Workflows/InProc/InProcessRunnerContext.cs @@ -105,11 +105,17 @@ internal class InProcessRunnerContext : IRunnerContext public ValueTask AddEventAsync(WorkflowEvent workflowEvent) => RunnerContext.AddEventAsync(workflowEvent); public ValueTask SendMessageAsync(object message, string? targetId = null) => RunnerContext.SendMessageAsync(ExecutorId, message, targetId); + public ValueTask ReadStateAsync(string key, string? scopeName = null) + => RunnerContext.StateManager.ReadStateAsync(ExecutorId, scopeName, key); + + public ValueTask> ReadStateKeysAsync(string? scopeName = null) + => RunnerContext.StateManager.ReadKeysAsync(ExecutorId, scopeName); + public ValueTask QueueStateUpdateAsync(string key, T? value, string? scopeName = null) => RunnerContext.StateManager.WriteStateAsync(ExecutorId, scopeName, key, value); - public ValueTask ReadStateAsync(string key, string? scopeName = null) - => RunnerContext.StateManager.ReadStateAsync(ExecutorId, scopeName, key); + public ValueTask QueueClearScopeAsync(string? scopeName = null) + => RunnerContext.StateManager.ClearStateAsync(ExecutorId, scopeName); } internal Task PrepareForCheckpointAsync(CancellationToken cancellation = default) diff --git a/dotnet/src/Microsoft.Agents.Workflows/ScopeId.cs b/dotnet/src/Microsoft.Agents.Workflows/ScopeId.cs index b638bb59f6..3f4c64354b 100644 --- a/dotnet/src/Microsoft.Agents.Workflows/ScopeId.cs +++ b/dotnet/src/Microsoft.Agents.Workflows/ScopeId.cs @@ -52,6 +52,28 @@ public class ScopeId(string executorId, string? scopeName = null) return false; } + /// + public static bool operator ==(ScopeId? left, ScopeId? right) + { + if (left is null && right == null) + { + return true; + } + + if (right is null) + { + return false; + } + + // The inversion here is necessary because the null analysis is incapable of proving to itself + // that left cannot be null here: If it was, either right is null, and we returned true, or right + // is not null, and we returned false. + return right.Equals(left); + } + + /// + public static bool operator !=(ScopeId? left, ScopeId? right) => !(left == right); + /// public override int GetHashCode() { diff --git a/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/StateKeyObjectTests.cs b/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/StateKeyObjectTests.cs new file mode 100644 index 0000000000..9ab2d4891b --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/StateKeyObjectTests.cs @@ -0,0 +1,97 @@ +// Copyright (c) Microsoft. All rights reserved. + +using FluentAssertions; +using Microsoft.Agents.Workflows.Execution; + +namespace Microsoft.Agents.Workflows.UnitTests; + +public class StateKeyObjectTests +{ + [Fact] + public void Test_ScopeId_Equality() + { + // The rules of ScopeId are simple: Private executor scopes (executorId, scopeId=null) are only equal to + // themselves. Public ScopeIds are equal when their scopeNames are equal, regardless of executorId. + + ScopeId privateScope1 = new("executor1", null); + ScopeId privateScope2 = new("executor2", null); + + Assert.NotEqual(privateScope1, privateScope2); + Assert.Equal(privateScope1, new ScopeId("executor1", null)); + + ScopeId sharedScope1 = new("executor1", "sharedScope"); + ScopeId sharedScope2 = new("executor2", "sharedScope"); + + Assert.Equal(sharedScope1, sharedScope2); + Assert.NotEqual(sharedScope1, new ScopeId("executor1", "differentScope")); + Assert.NotEqual(sharedScope1, privateScope1); + } + + [Fact] + public void Test_UpdateKey_Equality() + { + // The rules of UpdateKey are different from ScopeId. In the case of "shared scope", + // two update keys with different ExecutorIds are not the same. + + const string Key1 = "key1"; + const string Key2 = "key2"; + UpdateKey privateScope1Key = new("executor1", null, Key1); + UpdateKey privateScope1Key2 = new("executor1", null, Key2); + + Assert.NotEqual(privateScope1Key, privateScope1Key2); + + UpdateKey privateScope2Key = new("executor2", null, Key1); + + Assert.NotEqual(privateScope1Key, privateScope2Key); + + UpdateKey scope1Executor1Key = new("executor1", "sharedScope", Key1); + UpdateKey scope1Executor2Key = new("executor2", "sharedScope", Key1); + + Assert.NotEqual(scope1Executor1Key, scope1Executor2Key); + } + + [Fact] + public void Test_UpdateKey_IsMatchingScope() + { + const string Key1 = "key1"; + + UpdateKey privateScope1Key = new("executor1", null, Key1); + UpdateKey privateScope2Key = new("executor2", null, Key1); + + ScopeId privateScope1 = new("executor1", null); + ScopeId privateScope2 = new("executor2", null); + + ValidateMatch(privateScope1Key, privateScope1, expectedStrict: true, expectedLoose: true); + ValidateMatch(privateScope1Key, privateScope2, expectedStrict: false, expectedLoose: false); + ValidateMatch(privateScope2Key, privateScope1, expectedStrict: false, expectedLoose: false); + ValidateMatch(privateScope2Key, privateScope2, expectedStrict: true, expectedLoose: true); + + UpdateKey sharedScope1Key = new("executor1", "sharedScope", Key1); + UpdateKey sharedScope2Key = new("executor2", "sharedScope", Key1); + + ScopeId sharedScope1 = new("executor1", "sharedScope"); + ScopeId sharedScope2 = new("executor2", "sharedScope"); + + ValidateMatch(sharedScope1Key, sharedScope1, expectedStrict: true, expectedLoose: true); + ValidateMatch(sharedScope1Key, sharedScope2, expectedStrict: false, expectedLoose: true); + ValidateMatch(sharedScope2Key, sharedScope1, expectedStrict: false, expectedLoose: true); + ValidateMatch(sharedScope2Key, sharedScope2, expectedStrict: true, expectedLoose: true); + + // Cross checks between private and shared scopes should never match + ValidateMatch(privateScope1Key, sharedScope1, expectedStrict: false, expectedLoose: false); + ValidateMatch(privateScope1Key, sharedScope2, expectedStrict: false, expectedLoose: false); + ValidateMatch(privateScope2Key, sharedScope1, expectedStrict: false, expectedLoose: false); + ValidateMatch(privateScope2Key, sharedScope2, expectedStrict: false, expectedLoose: false); + + ValidateMatch(sharedScope1Key, privateScope1, expectedStrict: false, expectedLoose: false); + ValidateMatch(sharedScope1Key, privateScope2, expectedStrict: false, expectedLoose: false); + ValidateMatch(sharedScope2Key, privateScope1, expectedStrict: false, expectedLoose: false); + ValidateMatch(sharedScope2Key, privateScope2, expectedStrict: false, expectedLoose: false); + + void ValidateMatch(UpdateKey key, ScopeId scope, bool expectedStrict, bool expectedLoose) + { + key.IsMatchingScope(scope, strict: true).Should().Be(expectedStrict); + key.IsMatchingScope(scope, strict: false).Should().Be(expectedLoose); + } + } +} diff --git a/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/StateManagerTests.cs b/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/StateManagerTests.cs new file mode 100644 index 0000000000..e9554da976 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/StateManagerTests.cs @@ -0,0 +1,454 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using FluentAssertions; +using Microsoft.Agents.Workflows.Execution; + +namespace Microsoft.Agents.Workflows.UnitTests; + +public class StateManagerTests +{ + [Fact] + public async Task Test_SharedScope_ReadKeysAsync() + { + const string? ScopeName = "sharedScope"; + await RunScopeKeysTestAsync(ScopeName, isSharedScope: true); + } + + [Fact] + public async Task Test_PrivateScope_ReadKeysAsync() + { + const string? ScopeName = null; + await RunScopeKeysTestAsync(ScopeName, isSharedScope: false); + } + + private static async Task RunScopeKeysTestAsync(string? scopeName, bool isSharedScope) + { + const string SelfExecutorId = "executor1"; + const string OtherExecutorId = "executor2"; + const string Key1 = "key1"; + HashSet ExpectedAfterWrite = [Key1]; + + StateManager manager = new(); + ScopeId sharedScopeSelfView = new(SelfExecutorId, scopeName); + ScopeId sharedScopeOtherView = new(OtherExecutorId, scopeName); + + // Assert baseline: neither executor sees any keys + HashSet selfKeys = await manager.ReadKeysAsync(sharedScopeSelfView); + selfKeys.Should().BeEmpty("there should be no keys in an empty StateManager"); + + HashSet otherKeys = await manager.ReadKeysAsync(sharedScopeOtherView); + otherKeys.Should().BeEmpty("there should be no keys in an empty StateManager"); + + // Act 1: Write a key from the self executor's view of the shared scope + + await manager.WriteStateAsync(sharedScopeSelfView, Key1, "value1"); + + // Assert 1: The self executor should see the key immediately, but the other executor should not + selfKeys = await manager.ReadKeysAsync(sharedScopeSelfView); + selfKeys.SetEquals(ExpectedAfterWrite).Should().BeTrue("writes should be visible immediately to the writing executor"); + + otherKeys = await manager.ReadKeysAsync(sharedScopeOtherView); + otherKeys.Should().BeEmpty(isSharedScope ? "writes should not be visible to other executors until published" + : "writes to private scopes should not be visible across executors"); + + // Act 2: Publish the updates + await manager.PublishUpdatesAsync(tracer: null); + + // Assert 2: Both executors should see the key now, if sharedScope + selfKeys = await manager.ReadKeysAsync(sharedScopeSelfView); + selfKeys.SetEquals(ExpectedAfterWrite).Should().BeTrue("published writes should be visible to all executors"); + + otherKeys = await manager.ReadKeysAsync(sharedScopeOtherView); + + if (isSharedScope) + { + otherKeys.SetEquals(ExpectedAfterWrite).Should().BeTrue("published writes should be visible to all executors"); + } + else + { + otherKeys.Should().BeEmpty("writes to private scopes should not be visible across executors"); + } + + // Act 3: Clear the state from the self executor's view of the shared scope + await manager.WriteStateAsync(sharedScopeSelfView, Key1, null); + + // Assert 3: The self executor should not see the key immediately, but the other executor should still see it if sharedScope + selfKeys = await manager.ReadKeysAsync(sharedScopeSelfView); + selfKeys.Should().BeEmpty("deletes should be visible immediately to the writing executor"); + + otherKeys = await manager.ReadKeysAsync(sharedScopeOtherView); + if (isSharedScope) + { + otherKeys.SetEquals(ExpectedAfterWrite).Should().BeTrue("published writes should be visible to all executors"); + } + else + { + otherKeys.Should().BeEmpty("writes to private scopes should not be visible across executors"); + } + + // Act 4: Publish the updates + await manager.PublishUpdatesAsync(tracer: null); + + // Assert 4: Neither executor should see the key now + selfKeys = await manager.ReadKeysAsync(sharedScopeSelfView); + selfKeys.Should().BeEmpty("published deletes should be visible to all executors"); + + otherKeys = await manager.ReadKeysAsync(sharedScopeOtherView); + otherKeys.Should().BeEmpty(isSharedScope ? "published deletes should be visible to all executors" + : "writes to private scopes should not be visible across executors"); + } + + [Fact] + public async Task Test_SharedScope_ValueLifecycleAsync() + { + const string? ScopeName = "sharedScope"; + await RunValueLifecycleTestAsync(ScopeName, isSharedScope: true); + } + + [Fact] + public async Task Test_PrivateScope_ValueLifecycleAsync() + { + const string? ScopeName = null; + await RunValueLifecycleTestAsync(ScopeName, isSharedScope: false); + } + + private static async Task RunValueLifecycleTestAsync(string? scopeName, bool isSharedScope) + { + const string SelfExecutorId = "executor1"; + const string OtherExecutorId = "executor2"; + const string Key1 = "key1", Key2 = "key2"; + const string Value1 = "value1", Value2 = "value2"; + + StateManager manager = new(); + ScopeId scopeSelfView = new(SelfExecutorId, scopeName); + ScopeId scopeOtherView = new(OtherExecutorId, scopeName); + + isSharedScope.Should().Be(scopeSelfView == scopeOtherView); + + // Assert baseline: neither executor sees any keys or values + string? selfValue1 = await manager.ReadStateAsync(scopeSelfView, Key1); + string? selfValue2 = await manager.ReadStateAsync(scopeSelfView, Key2); + selfValue1.Should().BeNull("there should be no values in an empty StateManager"); + selfValue2.Should().BeNull("there should be no values in an empty StateManager"); + + string? otherValue1 = await manager.ReadStateAsync(scopeOtherView, Key1); + string? otherValue2 = await manager.ReadStateAsync(scopeOtherView, Key2); + otherValue1.Should().BeNull("there should be no values in an empty StateManager"); + otherValue2.Should().BeNull("there should be no values in an empty StateManager"); + + // Act 1: Write a value from the self executor's view of the shared scope + await manager.WriteStateAsync(scopeSelfView, Key1, Value1); + + // Assert 1: The self executor should see the value immediately, but the other executor should not + selfValue1 = await manager.ReadStateAsync(scopeSelfView, Key1); + selfValue1.Should().Be(Value1, "writes should be visible immediately to the writing executor"); + + selfValue2 = await manager.ReadStateAsync(scopeSelfView, Key2); + selfValue2.Should().BeNull("uninvolved keys' state/value should not change after a write"); + + otherValue1 = await manager.ReadStateAsync(scopeOtherView, Key1); + otherValue1.Should().BeNull(isSharedScope ? "writes should not be visible to other executors until published (key1: written by self, read by other)" + : "writes to private scopes should not be visible across executors"); + + otherValue2 = await manager.ReadStateAsync(scopeOtherView, Key2); + otherValue2.Should().BeNull("uninvolved keys' state/value should not change after a write"); + + // Act 2: Write a value from the other executor's view of the shared scope + await manager.WriteStateAsync(scopeOtherView, Key2, Value2); + + // Assert 2: The other executor should see the value immediately, but the self executor should not + selfValue1 = await manager.ReadStateAsync(scopeSelfView, Key1); + selfValue1.Should().Be(Value1, "uninvolved keys' state/value should not change after a write"); + + selfValue2 = await manager.ReadStateAsync(scopeSelfView, Key2); + selfValue2.Should().BeNull(isSharedScope ? "writes should not be visible to other executors until published (key2: written by other, read by self)" + : "writes to private scopes should not be visible across executors"); + + otherValue1 = await manager.ReadStateAsync(scopeOtherView, Key1); + otherValue1.Should().BeNull(isSharedScope ? "writes should not be visible to other executors until published (key1: written by self, read by other)" + : "writes to private scopes should not be visible across executors"); + + otherValue2 = await manager.ReadStateAsync(scopeOtherView, Key2); + otherValue2.Should().Be(Value2, "writes should be visible immediately to the writing executor"); + + // Act 3: Publish the updates + await manager.PublishUpdatesAsync(tracer: null); + + // Assert 3: Both executors should see both values now, if the scope is shared + selfValue1 = await manager.ReadStateAsync(scopeSelfView, Key1); + selfValue1.Should().Be(Value1, "published writes should be visible to all executors (key1: written by self, read by self)"); + + selfValue2 = await manager.ReadStateAsync(scopeSelfView, Key2); + if (isSharedScope) + { + selfValue2.Should().Be(Value2, "published writes should be visible to all executors (key2: written by other, read by self)"); + } + else + { + selfValue2.Should().BeNull("writes to private scopes should not be visible across executors"); + } + + otherValue1 = await manager.ReadStateAsync(scopeOtherView, Key1); + if (isSharedScope) + { + otherValue1.Should().Be(Value1, "published writes should be visible to all executors (key1: written by self, read by other)"); + } + else + { + otherValue1.Should().BeNull("writes to private scopes should not be visible across executors"); + } + + otherValue2 = await manager.ReadStateAsync(scopeOtherView, Key2); + otherValue2.Should().Be(Value2, "published writes should be visible to all executors (key2: written by other, read by other)"); + + // Act 4: Clear the value from the self executor's view of the shared scope + await manager.ClearStateAsync(scopeSelfView); + + // Assert 4: The self executor should not see either value immediately, but the other executor should still see both + selfValue1 = await manager.ReadStateAsync(scopeSelfView, Key1); + selfValue1.Should().BeNull("clears should be visible immediately to the writing executor"); + + selfValue2 = await manager.ReadStateAsync(scopeSelfView, Key2); + selfValue2.Should().BeNull(isSharedScope ? "clears should be visible immediately to the writing executor" + : "writes to private scopes should not be visible across executors"); + + otherValue1 = await manager.ReadStateAsync(scopeOtherView, Key1); + if (isSharedScope) + { + otherValue1.Should().Be(Value1, "clears should not be visible to other executors until published (key2: written by self, read by other)"); + } + else + { + otherValue1.Should().BeNull("writes to private scopes should not be visible across executors"); + } + + otherValue2 = await manager.ReadStateAsync(scopeOtherView, Key2); + otherValue2.Should().Be(Value2, isSharedScope ? "clears should not be visible to other executors until published (key2: written by self, read by other)" + : "writes to private scopes should not be visible across executors"); + + // Act 5: Publish the updates + await manager.PublishUpdatesAsync(tracer: null); + + // Assert 5: Neither executor should see either value now + selfValue1 = await manager.ReadStateAsync(scopeSelfView, Key1); + selfValue1.Should().BeNull("published clears should be visible to all executors"); + + selfValue2 = await manager.ReadStateAsync(scopeSelfView, Key2); + selfValue2.Should().BeNull(isSharedScope ? "published clears should be visible to all executors" + : "writes to private scopes should not be visible across executors"); + + otherValue1 = await manager.ReadStateAsync(scopeOtherView, Key1); + otherValue1.Should().BeNull(isSharedScope ? "published clears should be visible to all executors" + : "writes to private scopes should not be visible across executors"); + + otherValue2 = await manager.ReadStateAsync(scopeOtherView, Key2); + if (isSharedScope) + { + otherValue2.Should().BeNull("published clears should be visible to all executors"); + } + else + { + otherValue2.Should().Be(Value2, "writes to private scopes should not be visible across executors"); + } + + // Restore the written state of both keys + await manager.WriteStateAsync(scopeSelfView, Key1, Value1); + await manager.WriteStateAsync(scopeOtherView, Key2, Value2); + await manager.PublishUpdatesAsync(tracer: null); + + // Act 6: Delete Key1 from the other executor's view of the shared scope + await manager.WriteStateAsync(scopeOtherView, Key1, null); + + // Assert 6: The other executor should not see Key1 immediately, but should still see Key2. The self executor should still see both. + selfValue1 = await manager.ReadStateAsync(scopeSelfView, Key1); + selfValue1.Should().Be(Value1, isSharedScope ? "deletes should not be visible to other executors until published (key1: written by other, read by self)" + : "writes to private scopes should not be visible across executors"); + + selfValue2 = await manager.ReadStateAsync(scopeSelfView, Key2); + if (isSharedScope) + { + selfValue2.Should().Be(Value2, "uninvolved keys' state/value should not change after a delete"); + } + else + { + selfValue2.Should().BeNull("writes to private scopes should not be visible across executors"); + } + + otherValue1 = await manager.ReadStateAsync(scopeOtherView, Key1); + otherValue1.Should().BeNull(isSharedScope ? "deletes should be visible immediately to the writing executor" + : "writes to private scopes should not be visible across executors"); + + otherValue2 = await manager.ReadStateAsync(scopeOtherView, Key2); + otherValue2.Should().Be(Value2, "uninvolved keys' state/value should not change after a delete"); + + // Act 7: Delete Key2 from the self executor's view of the shared scope + await manager.WriteStateAsync(scopeSelfView, Key2, null); + + // Assert 7: The self executor should not see Key2 immediately, but should still see Key1. + // The other executor should not see Key1, but should still see Key2. + selfValue1 = await manager.ReadStateAsync(scopeSelfView, Key1); + selfValue1.Should().Be(Value1, isSharedScope ? "deletes should not be visible to other executors until published (key1: written by other, read by self)" + : "writes to private scopes should not be visible across executors"); + + selfValue2 = await manager.ReadStateAsync(scopeSelfView, Key2); + selfValue2.Should().BeNull(isSharedScope ? "deletes should be visible immediately to the writing executor" + : "writes to private scopes should not be visible across executors"); + + otherValue1 = await manager.ReadStateAsync(scopeOtherView, Key1); + otherValue1.Should().BeNull(isSharedScope ? "deletes should be visible immediately to the writing executor" + : "writes to private scopes should not be visible across executors"); + + otherValue2 = await manager.ReadStateAsync(scopeOtherView, Key2); + otherValue2.Should().Be(Value2, isSharedScope ? "deletes should not be visible to other executors until published (key2: written by self, read by other)" + : "writes to private scopes should not be visible across executors"); + + // Act 8: Publish the updates + await manager.PublishUpdatesAsync(tracer: null); + + // Assert 8: Neither executor should see either value now + selfValue1 = await manager.ReadStateAsync(scopeSelfView, Key1); + if (isSharedScope) + { + selfValue1.Should().BeNull("published deletes should be visible to all executors"); + } + else + { + selfValue1.Should().Be(Value1, "writes to private scopes should not be visible across executors"); + } + + selfValue2 = await manager.ReadStateAsync(scopeSelfView, Key2); + selfValue2.Should().BeNull(isSharedScope ? "published deletes should be visible to all executors" + : "writes to private scopes should not be visible across executors"); + + otherValue1 = await manager.ReadStateAsync(scopeOtherView, Key1); + otherValue1.Should().BeNull(isSharedScope ? "published deletes should be visible to all executors" + : "writes to private scopes should not be visible across executors"); + + otherValue2 = await manager.ReadStateAsync(scopeOtherView, Key2); + if (isSharedScope) + { + otherValue2.Should().BeNull("published deletes should be visible to all executors"); + } + else + { + otherValue2.Should().Be(Value2, "writes to private scopes should not be visible across executors"); + } + } + + [Fact] + public async Task Test_SharedScope_ConflictingUpdatesAsync() + { + const string? ScopeName = "sharedScope"; + await RunConflictingUpdatesTest_WriteVsWriteAsync(ScopeName, isSharedScope: true); + await RunConflictingUpdatesTest_WriteVsDeleteAsync(ScopeName, isSharedScope: true); + await RunConflictingUpdatesTest_WriteVsClearAsync(ScopeName, isSharedScope: true); + } + + [Fact] + public async Task Test_PrivateScope_ConflictingUpdatesAsync() + { + const string? ScopeName = null; + await RunConflictingUpdatesTest_WriteVsWriteAsync(ScopeName, isSharedScope: false); + await RunConflictingUpdatesTest_WriteVsDeleteAsync(ScopeName, isSharedScope: false); + await RunConflictingUpdatesTest_WriteVsClearAsync(ScopeName, isSharedScope: false); + } + + private static async Task RunConflictingUpdatesTest_WriteVsWriteAsync(string? scopeName, bool isSharedScope) + { + const string SelfExecutorId = "executor1"; + const string OtherExecutorId = "executor2"; + const string Key1 = "key1"; + const string Value1 = "value", Value2 = "value"; + + // Arrange + StateManager manager = new(); + ScopeId scopeSelfView = new(SelfExecutorId, scopeName); + ScopeId scopeOtherView = new(OtherExecutorId, scopeName); + isSharedScope.Should().Be(scopeSelfView == scopeOtherView); + + // Act 1: Write a conflicting value from the self executor's view of the shared scope + // Note that conflicting means update to the same key, not that the values are necessarily different. + // We do not have any logic to resolve equivalent updates from different executors as idempotent. + await manager.WriteStateAsync(scopeSelfView, Key1, Value1); + await manager.WriteStateAsync(scopeOtherView, Key1, Value2); + + Func act = async () => await manager.PublishUpdatesAsync(tracer: null); + + if (isSharedScope) + { + await act.Should().ThrowAsync("conflicting writes to the same key should raise an exception when published"); + } + else + { + await act.Should().NotThrowAsync("writes to private scopes should not be visible across executors"); + } + } + + private static async Task RunConflictingUpdatesTest_WriteVsDeleteAsync(string? scopeName, bool isSharedScope) + { + const string SelfExecutorId = "executor1"; + const string OtherExecutorId = "executor2"; + const string Key1 = "key1", Key2 = "key2"; + const string Value1 = "value", Value2 = "value"; + + // Arrange + StateManager manager = new(); + ScopeId scopeSelfView = new(SelfExecutorId, scopeName); + ScopeId scopeOtherView = new(OtherExecutorId, scopeName); + isSharedScope.Should().Be(scopeSelfView == scopeOtherView); + + await manager.WriteStateAsync(scopeSelfView, Key1, Value1); + await manager.WriteStateAsync(scopeOtherView, Key2, Value2); + await manager.PublishUpdatesAsync(tracer: null); + + // Act: Update the key from one executor and delete it from another + await manager.WriteStateAsync(scopeSelfView, Key1, "newValue"); + await manager.WriteStateAsync(scopeOtherView, Key1, null); + Func act = async () => await manager.PublishUpdatesAsync(tracer: null); + + if (isSharedScope) + { + await act.Should().ThrowAsync("conflicting writes (update vs delete) should raise an exception when published"); + } + else + { + await act.Should().NotThrowAsync("writes to private scopes should not be visible across executors"); + } + } + + private static async Task RunConflictingUpdatesTest_WriteVsClearAsync(string? scopeName, bool isSharedScope) + { + const string SelfExecutorId = "executor1"; + const string OtherExecutorId = "executor2"; + const string Key1 = "key1", Key2 = "key2"; + const string Value1 = "value", Value2 = "value"; + + // Arrange + StateManager manager = new(); + ScopeId scopeSelfView = new(SelfExecutorId, scopeName); + ScopeId scopeOtherView = new(OtherExecutorId, scopeName); + isSharedScope.Should().Be(scopeSelfView == scopeOtherView); + + await manager.WriteStateAsync(scopeSelfView, Key1, Value1); + await manager.WriteStateAsync(scopeOtherView, Key2, Value2); + await manager.PublishUpdatesAsync(tracer: null); + + // Act: Update the key from one, and clear the entire scope from another + await manager.WriteStateAsync(scopeSelfView, Key1, "newValue"); + await manager.ClearStateAsync(scopeOtherView); + Func act = async () => await manager.PublishUpdatesAsync(tracer: null); + + // Assert + if (isSharedScope) + { + await act.Should().ThrowAsync("conflicting writes (update vs clear) should raise an exception when published"); + } + else + { + await act.Should().NotThrowAsync("writes to private scopes should not be visible across executors"); + } + } +} diff --git a/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/StateSmokeTest.cs b/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/StateSmokeTest.cs deleted file mode 100644 index 32054aae5d..0000000000 --- a/dotnet/tests/Microsoft.Agents.Workflows.UnitTests/StateSmokeTest.cs +++ /dev/null @@ -1,119 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. - -using System; -using System.Threading.Tasks; -using Microsoft.Agents.Workflows.Execution; - -namespace Microsoft.Agents.Workflows.UnitTests; - -public class StateSmokeTest -{ - [Fact] - public void Test_ScopeId_Equality() - { - // The rules of ScopeId are simple: Private executor scopes (executorId, scopeId=null) are only equal to - // themselves. Public ScopeIds are equal when their scopeNames are equal, regardless of executorId. - - ScopeId privateScope1 = new("executor1", null); - ScopeId privateScope2 = new("executor2", null); - - Assert.NotEqual(privateScope1, privateScope2); - Assert.Equal(privateScope1, new ScopeId("executor1", null)); - - ScopeId sharedScope1 = new("executor1", "sharedScope"); - ScopeId sharedScope2 = new("executor2", "sharedScope"); - - Assert.Equal(sharedScope1, sharedScope2); - Assert.NotEqual(sharedScope1, new ScopeId("executor1", "differentScope")); - Assert.NotEqual(sharedScope1, privateScope1); - } - - [Fact] - public void Test_UpdateKey_Equality() - { - // The rules of UpdateKey are different from ScopeId. In the case of "shared scope", - // two update keys with different ExecutorIds are not the same. - - const string Key1 = "key1"; - const string Key2 = "key2"; - UpdateKey privateScope1Key = new("executor1", null, Key1); - UpdateKey privateScope1Key2 = new("executor1", null, Key2); - - Assert.NotEqual(privateScope1Key, privateScope1Key2); - - UpdateKey privateScope2Key = new("executor2", null, Key1); - - Assert.NotEqual(privateScope1Key, privateScope2Key); - - UpdateKey scope1Executor1Key = new("executor1", "sharedScope", Key1); - UpdateKey scope1Executor2Key = new("executor2", "sharedScope", Key1); - - Assert.NotEqual(scope1Executor1Key, scope1Executor2Key); - } - - [Fact] - public async Task Test_ReadQueueUpdateAsync() - { - ScopeId sharedScope1 = new("executor1", "sharedScope"); - ScopeId sharedScope2 = new("executor2", "sharedScope"); - - StateManager manager = new(); - - // Default reads on "object" should be null - const string Key = "key1"; - Assert.Null(await manager.ReadStateAsync(sharedScope1, Key)); - Assert.Null(await manager.ReadStateAsync(sharedScope1, Key)); - - await manager.WriteStateAsync(sharedScope1, Key, new object()); - - // After writing, we should be able to read the value from the executor's scope - // but not the shared scope yet - Assert.NotNull(await manager.ReadStateAsync(sharedScope1, Key)); - Assert.Null(await manager.ReadStateAsync(sharedScope2, Key)); - - // Writes to one key should not impact other keys - Assert.Null(await manager.ReadStateAsync(sharedScope1, "key2")); - - // Publish the write - await manager.PublishUpdatesAsync(tracer: null); - - // Now all the executors should be able to see the new state - Assert.NotNull(await manager.ReadStateAsync(sharedScope1, Key)); - Assert.NotNull(await manager.ReadStateAsync(sharedScope2, Key)); - } - - [Fact] - public async Task Test_ConflictingWritesRaiseExceptionAsync() - { - ScopeId sharedScope1 = new("executor1", "sharedScope"); - ScopeId sharedScope2 = new("executor2", "sharedScope"); - - StateManager manager = new(); - - const string Key = "key1"; - const string Value1 = "1"; - const string Value2 = "2"; - - // Write values from both executors - await manager.WriteStateAsync(sharedScope1, Key, Value1); - await manager.WriteStateAsync(sharedScope2, Key, Value2); - - // Check that reading each will result in the right value - Assert.Equal(Value1, await manager.ReadStateAsync(sharedScope1, Key)); - Assert.Equal(Value2, await manager.ReadStateAsync(sharedScope2, Key)); - - // Try to publish the updates - try - { - await manager.PublishUpdatesAsync(tracer: null); - Assert.Fail("Expected InvalidOperationException due to conflicting writes."); - } - catch (InvalidOperationException) - { - } - catch (Exception ex) - { - Assert.Fail($"Expected InvalidOperationException, but got {ex.GetType().Name}."); - } - } -}