mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
6a39d5a652
* Rename WorkflowOutputEvent.SourceId to ExecutorId for Python consistency - Rename SourceId property to ExecutorId in WorkflowOutputEvent - Add [Obsolete] SourceId property for backward compatibility - Update all test usages to use ExecutorId Resolves part of #2938 * Unify AgentResponse events with WorkflowOutputEvent (#2938) - Change AgentResponseEvent and AgentResponseUpdateEvent to inherit from WorkflowOutputEvent instead of ExecutorEvent - Update AIAgentHostExecutor and HandoffAgentExecutor to use YieldOutputAsync() instead of AddEventAsync() for agent outputs - Add special-casing in InProcessRunnerContext.YieldOutputAsync() to create specific event types for AgentResponse and AgentResponseUpdate, bypassing OutputFilter for backwards compatibility - Update TestRunContext and TestWorkflowContext with same special-casing - Add regression tests in AgentEventsTests * refactor: Seal AgentResponse events
88 lines
3.6 KiB
C#
88 lines
3.6 KiB
C#
// Copyright (c) Microsoft. All rights reserved.
|
|
|
|
using System;
|
|
using System.Collections.Concurrent;
|
|
using System.Collections.Generic;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using Microsoft.Agents.AI.Workflows.Execution;
|
|
|
|
namespace Microsoft.Agents.AI.Workflows.UnitTests;
|
|
|
|
internal sealed class TestWorkflowContext : IWorkflowContext
|
|
{
|
|
private readonly string _executorId;
|
|
private readonly TestRunState _state;
|
|
|
|
public TestWorkflowContext(string executorId, TestRunState? state = null, bool concurrentRunsEnabled = false)
|
|
{
|
|
this._executorId = executorId;
|
|
this._state = state ?? new TestRunState();
|
|
|
|
this.ConcurrentRunsEnabled = concurrentRunsEnabled;
|
|
}
|
|
|
|
public bool ConcurrentRunsEnabled { get; }
|
|
|
|
public ConcurrentQueue<object> SentMessages => this._state.SentMessages.GetOrAdd(this._executorId, _ => new());
|
|
|
|
public StateManager StateManager => this._state.StateManager;
|
|
|
|
public ConcurrentQueue<WorkflowEvent> EmittedEvents => this._state.EmittedEvents;
|
|
|
|
public ConcurrentQueue<object> YieldedOutputs => this._state.YieldedOutputs.GetOrAdd(this._executorId, _ => new());
|
|
|
|
public ValueTask AddEventAsync(WorkflowEvent workflowEvent, CancellationToken cancellationToken = default)
|
|
{
|
|
this.EmittedEvents.Enqueue(workflowEvent);
|
|
return default;
|
|
}
|
|
|
|
public ValueTask YieldOutputAsync(object output, CancellationToken cancellationToken = default)
|
|
{
|
|
this.YieldedOutputs.Enqueue(output);
|
|
|
|
// Special-case AgentResponse and AgentResponseUpdate to create their specific event types
|
|
// (consistent with InProcessRunnerContext.YieldOutputAsync)
|
|
if (output is AgentResponseUpdate update)
|
|
{
|
|
return this.AddEventAsync(new AgentResponseUpdateEvent(this._executorId, update), cancellationToken);
|
|
}
|
|
else if (output is AgentResponse response)
|
|
{
|
|
return this.AddEventAsync(new AgentResponseEvent(this._executorId, response), cancellationToken);
|
|
}
|
|
|
|
return this.AddEventAsync(new WorkflowOutputEvent(output, this._executorId), cancellationToken);
|
|
}
|
|
|
|
public ValueTask RequestHaltAsync()
|
|
{
|
|
this._state.IncrementHaltRequests();
|
|
return default;
|
|
}
|
|
|
|
public ValueTask QueueClearScopeAsync(string? scopeName = null, CancellationToken cancellationToken = default)
|
|
=> this.StateManager.ClearStateAsync(new ScopeId(this._executorId, scopeName));
|
|
|
|
public ValueTask QueueStateUpdateAsync<T>(string key, T? value, string? scopeName = null, CancellationToken cancellationToken = default)
|
|
=> this.StateManager.WriteStateAsync(new ScopeId(this._executorId, scopeName), key, value);
|
|
|
|
public ValueTask<T?> ReadStateAsync<T>(string key, string? scopeName = null, CancellationToken cancellationToken = default)
|
|
=> this.StateManager.ReadStateAsync<T>(new ScopeId(this._executorId, scopeName), key);
|
|
|
|
public ValueTask<T> ReadOrInitStateAsync<T>(string key, Func<T> initialStateFactory, string? scopeName = null, CancellationToken cancellationToken = default)
|
|
=> this.StateManager.ReadOrInitStateAsync(new ScopeId(this._executorId, scopeName), key, initialStateFactory);
|
|
|
|
public ValueTask<HashSet<string>> ReadStateKeysAsync(string? scopeName = null, CancellationToken cancellationToken = default)
|
|
=> this.StateManager.ReadKeysAsync(new ScopeId(this._executorId, scopeName));
|
|
|
|
public ValueTask SendMessageAsync(object message, string? targetId = null, CancellationToken cancellationToken = default)
|
|
{
|
|
this.SentMessages.Enqueue(message);
|
|
return default;
|
|
}
|
|
|
|
public IReadOnlyDictionary<string, string>? TraceContext => null;
|
|
}
|