diff --git a/docs/decisions/0003-agent-opentelemetry-instrumentation.md b/docs/decisions/0003-agent-opentelemetry-instrumentation.md index 9b76eee411..863387b5cb 100644 --- a/docs/decisions/0003-agent-opentelemetry-instrumentation.md +++ b/docs/decisions/0003-agent-opentelemetry-instrumentation.md @@ -139,14 +139,6 @@ using var telemetryAgent = baseAgent.WithOpenTelemetry(); var response = await telemetryAgent.RunAsync(messages); ``` -### Integration with AppContext Switch - -The implementation integrates with the standard .NET telemetry enablement pattern: - -```csharp -AppContext.SetSwitch("Microsoft.Extensions.AI.Agents.EnableTelemetry", true); -``` - ### Relationship to Microsoft.Extensions.AI This implementation follows the exact patterns established by Microsoft.Extensions.AI's OpenTelemetry instrumentation, ensuring consistency across the AI ecosystem and leveraging proven patterns for telemetry integration. diff --git a/dotnet/Directory.Packages.props b/dotnet/Directory.Packages.props index bc0b0dd0db..558b25a90e 100644 --- a/dotnet/Directory.Packages.props +++ b/dotnet/Directory.Packages.props @@ -24,6 +24,7 @@ + diff --git a/dotnet/agent-framework-dotnet.slnx b/dotnet/agent-framework-dotnet.slnx index 246768b367..4d44098cd0 100644 --- a/dotnet/agent-framework-dotnet.slnx +++ b/dotnet/agent-framework-dotnet.slnx @@ -107,6 +107,9 @@ + + + diff --git a/dotnet/samples/GettingStarted/Workflows/Observability/AspireDashboard/AspireDashboard.csproj b/dotnet/samples/GettingStarted/Workflows/Observability/AspireDashboard/AspireDashboard.csproj new file mode 100644 index 0000000000..807bc89323 --- /dev/null +++ b/dotnet/samples/GettingStarted/Workflows/Observability/AspireDashboard/AspireDashboard.csproj @@ -0,0 +1,22 @@ + + + + Exe + net9.0 + + enable + disable + + + + + + + + + + + + + + \ No newline at end of file diff --git a/dotnet/samples/GettingStarted/Workflows/Observability/AspireDashboard/Program.cs b/dotnet/samples/GettingStarted/Workflows/Observability/AspireDashboard/Program.cs new file mode 100644 index 0000000000..18d6259cc9 --- /dev/null +++ b/dotnet/samples/GettingStarted/Workflows/Observability/AspireDashboard/Program.cs @@ -0,0 +1,101 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Diagnostics; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Agents.AI.Workflows; +using Microsoft.Agents.AI.Workflows.Reflection; +using OpenTelemetry; +using OpenTelemetry.Logs; +using OpenTelemetry.Metrics; +using OpenTelemetry.Resources; +using OpenTelemetry.Trace; + +namespace WorkflowObservabilitySample; + +/// +/// This sample shows how to enable observability in a workflow and send the traces +/// to be visualized in Aspire Dashboard. +/// +/// In this example, we create a simple text processing pipeline that: +/// 1. Takes input text and converts it to uppercase using an UppercaseExecutor +/// 2. Takes the uppercase text and reverses it using a ReverseTextExecutor +/// +/// The executors are connected sequentially, so data flows from one to the next in order. +/// For input "Hello, World!", the workflow produces "!DLROW ,OLLEH". +/// +public static class Program +{ + private const string SourceName = "Workflow.Sample"; + private static readonly ActivitySource s_activitySource = new(SourceName); + + private static async Task Main() + { + // Configure OpenTelemetry for Aspire dashboard + var otlpEndpoint = Environment.GetEnvironmentVariable("OTLP_ENDPOINT") ?? "http://localhost:4317"; + + var resourceBuilder = ResourceBuilder + .CreateDefault() + .AddService("WorkflowSample"); + + using var traceProvider = Sdk.CreateTracerProviderBuilder() + .SetResourceBuilder(resourceBuilder) + .AddSource("Microsoft.Agents.AI.Workflows*") + .AddSource(SourceName) + .AddOtlpExporter(options => options.Endpoint = new Uri(otlpEndpoint)) + .Build(); + + // Start a root activity for the application + using var activity = s_activitySource.StartActivity("main"); + Console.WriteLine($"Operation/Trace ID: {Activity.Current?.TraceId}"); + + // Create the executors + UppercaseExecutor uppercase = new(); + ReverseTextExecutor reverse = new(); + + // Build the workflow by connecting executors sequentially + var workflow = new WorkflowBuilder(uppercase) + .AddEdge(uppercase, reverse) + .Build(); + + // Execute the workflow with input data + Run run = await InProcessExecution.RunAsync(workflow, "Hello, World!"); + foreach (WorkflowEvent evt in run.NewEvents) + { + if (evt is ExecutorCompletedEvent executorComplete) + { + Console.WriteLine($"{executorComplete.ExecutorId}: {executorComplete.Data}"); + } + } + } +} + +/// +/// First executor: converts input text to uppercase. +/// +internal sealed class UppercaseExecutor() : ReflectingExecutor("UppercaseExecutor"), IMessageHandler +{ + /// + /// Processes the input message by converting it to uppercase. + /// + /// The input text to convert + /// Workflow context for accessing workflow services and adding events + /// The input text converted to uppercase + public async ValueTask HandleAsync(string message, IWorkflowContext context) => + message.ToUpperInvariant(); // The return value will be sent as a message along an edge to subsequent executors +} + +/// +/// Second executor: reverses the input text and completes the workflow. +/// +internal sealed class ReverseTextExecutor() : ReflectingExecutor("ReverseTextExecutor"), IMessageHandler +{ + /// + /// Processes the input message by reversing the text. + /// + /// The input text to reverse + /// Workflow context for accessing workflow services and adding events + /// The input text reversed + public async ValueTask HandleAsync(string message, IWorkflowContext context) => new string(message.Reverse().ToArray()); +} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/Interpreter/DeclarativeWorkflowContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/Interpreter/DeclarativeWorkflowContext.cs index cabec25607..3b01032467 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/Interpreter/DeclarativeWorkflowContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows.Declarative/Interpreter/DeclarativeWorkflowContext.cs @@ -29,6 +29,7 @@ internal sealed class DeclarativeWorkflowContext : IWorkflowContext private IWorkflowContext Source { get; } public WorkflowFormulaState State { get; } + public IReadOnlyDictionary? TraceContext => this.Source.TraceContext; /// public ValueTask AddEventAsync(WorkflowEvent workflowEvent) => this.Source.AddEventAsync(workflowEvent); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/DirectEdgeRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/DirectEdgeRunner.cs index 61c7ad7b66..ee303c500b 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/DirectEdgeRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/DirectEdgeRunner.cs @@ -1,36 +1,54 @@ // Copyright (c) Microsoft. All rights reserved. +using System; using System.Threading.Tasks; +using Microsoft.Agents.AI.Workflows.Observability; namespace Microsoft.Agents.AI.Workflows.Execution; internal sealed class DirectEdgeRunner(IRunnerContext runContext, DirectEdgeData edgeData) : EdgeRunner(runContext, edgeData) { - public IWorkflowContext WorkflowContext { get; } = runContext.Bind(edgeData.SinkId); - private async ValueTask FindRouterAsync(IStepTracer? tracer) => await this.RunContext.EnsureExecutorAsync(this.EdgeData.SinkId, tracer) .ConfigureAwait(false); protected internal override async ValueTask ChaseEdgeAsync(MessageEnvelope envelope, IStepTracer? stepTracer) { + using var activity = s_activitySource.StartActivity(ActivityNames.EdgeGroupProcess); + activity? + .SetTag(Tags.EdgeGroupType, nameof(DirectEdgeRunner)) + .SetTag(Tags.MessageSourceId, this.EdgeData.SourceId) + .SetTag(Tags.MessageTargetId, this.EdgeData.SinkId); + if (envelope.TargetId is not null && this.EdgeData.SinkId != envelope.TargetId) { + activity?.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.DroppedTargetMismatch); return null; } object message = envelope.Message; - if (this.EdgeData.Condition is not null && !this.EdgeData.Condition(message)) + try { - return null; - } - - Executor target = await this.FindRouterAsync(stepTracer).ConfigureAwait(false); - if (target.CanHandle(envelope.MessageType)) - { - return new DeliveryMapping(envelope, target); + if (this.EdgeData.Condition is not null && !this.EdgeData.Condition(message)) + { + activity?.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.DroppedConditionFalse); + return null; + } + + Executor target = await this.FindRouterAsync(stepTracer).ConfigureAwait(false); + if (target.CanHandle(envelope.MessageType)) + { + activity?.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.Delivered); + return new DeliveryMapping(envelope, target); + } + } + catch (Exception) when (activity is not null) + { + activity.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.Exception); + throw; } + activity?.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.DroppedTypeMismatch); return null; } } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/EdgeRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/EdgeRunner.cs index 31bb01da1e..d71fa539b3 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/EdgeRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/EdgeRunner.cs @@ -1,5 +1,6 @@ // Copyright (c) Microsoft. All rights reserved. +using System.Diagnostics; using System.Threading.Tasks; using Microsoft.Shared.Diagnostics; @@ -13,6 +14,9 @@ internal interface IStatefulEdgeRunner internal abstract class EdgeRunner { + protected static readonly string s_namespace = typeof(EdgeRunner).Namespace!; + protected static readonly ActivitySource s_activitySource = new(s_namespace); + // TODO: Can this be sync? protected internal abstract ValueTask ChaseEdgeAsync(MessageEnvelope envelope, IStepTracer? stepTracer); } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeRunner.cs index b79599b7d1..02c0252af3 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanInEdgeRunner.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading.Tasks; +using Microsoft.Agents.AI.Workflows.Observability; namespace Microsoft.Agents.AI.Workflows.Execution; @@ -18,8 +19,14 @@ internal sealed class FanInEdgeRunner(IRunnerContext runContext, FanInEdgeData e { Debug.Assert(!envelope.IsExternal, "FanIn edges should never be chased from external input"); + using var activity = s_activitySource.StartActivity(ActivityNames.EdgeGroupProcess); + activity? + .SetTag(Tags.EdgeGroupType, nameof(FanInEdgeRunner)) + .SetTag(Tags.MessageTargetId, this.EdgeData.SinkId); + if (envelope.TargetId is not null && this.EdgeData.SinkId != envelope.TargetId) { + activity?.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.DroppedTargetMismatch); return null; } @@ -28,14 +35,30 @@ internal sealed class FanInEdgeRunner(IRunnerContext runContext, FanInEdgeData e if (releasedMessages is null) { // Not ready to process yet. + activity?.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.Buffered); return null; } - // TODO: Filter messages based on accepted input types? - Executor target = await this.RunContext.EnsureExecutorAsync(this.EdgeData.SinkId, stepTracer) - .ConfigureAwait(false); + try + { + // TODO: Filter messages based on accepted input types? + Executor target = await this.RunContext.EnsureExecutorAsync(this.EdgeData.SinkId, stepTracer) + .ConfigureAwait(false); + // Materialize the filtered list via ToList() to avoid multiple enumerations + var finalReleasedMessages = releasedMessages.Where(envelope => target.CanHandle(envelope.MessageType)).ToList(); + if (finalReleasedMessages.Count == 0) + { + activity?.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.DroppedTypeMismatch); + return null; + } - return new DeliveryMapping(releasedMessages.Where(envelope => target.CanHandle(envelope.MessageType)), target); + return new DeliveryMapping(finalReleasedMessages, target); + } + catch (Exception) when (activity is not null) + { + activity.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.Exception); + throw; + } } public ValueTask ExportStateAsync() diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanOutEdgeRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanOutEdgeRunner.cs index 30729c1c0d..aa6133955d 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanOutEdgeRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/FanOutEdgeRunner.cs @@ -1,40 +1,61 @@ // Copyright (c) Microsoft. All rights reserved. +using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; +using Microsoft.Agents.AI.Workflows.Observability; namespace Microsoft.Agents.AI.Workflows.Execution; internal sealed class FanOutEdgeRunner(IRunnerContext runContext, FanOutEdgeData edgeData) : EdgeRunner(runContext, edgeData) { - private Dictionary BoundContexts { get; } - = edgeData.SinkIds.ToDictionary( - sinkId => sinkId, - runContext.Bind); - protected internal override async ValueTask ChaseEdgeAsync(MessageEnvelope envelope, IStepTracer? stepTracer) { + using var activity = s_activitySource.StartActivity(ActivityNames.EdgeGroupProcess); + activity? + .SetTag(Tags.EdgeGroupType, nameof(FanOutEdgeRunner)) + .SetTag(Tags.MessageSourceId, this.EdgeData.SourceId); + object message = envelope.Message; - IEnumerable targetIds = - this.EdgeData.EdgeAssigner is null - ? this.EdgeData.SinkIds - : this.EdgeData.EdgeAssigner(message, this.BoundContexts.Count) - .Select(i => this.EdgeData.SinkIds[i]); - Executor[] result = await Task.WhenAll(targetIds.Where(IsValidTarget) - .Select(tid => this.RunContext.EnsureExecutorAsync(tid, stepTracer) - .AsTask())) - .ConfigureAwait(false); - - if (result.Length == 0) + try { - return null; - } + IEnumerable targetIds = + this.EdgeData.EdgeAssigner is null + ? this.EdgeData.SinkIds + : this.EdgeData.EdgeAssigner(message, this.EdgeData.SinkIds.Count) + .Select(i => this.EdgeData.SinkIds[i]); - IEnumerable validTargets = result.Where(t => t.CanHandle(envelope.MessageType)); - return new DeliveryMapping(envelope, validTargets); + Executor[] result = await Task.WhenAll(targetIds.Where(IsValidTarget) + .Select(tid => this.RunContext.EnsureExecutorAsync(tid, stepTracer) + .AsTask())) + .ConfigureAwait(false); + + if (result.Length == 0) + { + activity?.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.DroppedTargetMismatch); + return null; + } + + IEnumerable validTargets = result.Where(t => t.CanHandle(envelope.MessageType)); + + if (!validTargets.Any()) + { + activity?.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.DroppedTypeMismatch); + return null; + } + + activity?.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.Delivered); + + return new DeliveryMapping(envelope, validTargets); + } + catch (Exception) when (activity is not null) + { + activity.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.Exception); + throw; + } bool IsValidTarget(string targetId) { diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/IRunnerContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/IRunnerContext.cs index be427d7702..fab24f4085 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/IRunnerContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/IRunnerContext.cs @@ -1,5 +1,6 @@ // Copyright (c) Microsoft. All rights reserved. +using System.Collections.Generic; using System.Threading.Tasks; namespace Microsoft.Agents.AI.Workflows.Execution; @@ -10,6 +11,6 @@ internal interface IRunnerContext : IExternalRequestSink ValueTask SendMessageAsync(string sourceId, object message, string? targetId = null); ValueTask AdvanceAsync(); - IWorkflowContext Bind(string executorId); + IWorkflowContext Bind(string executorId, Dictionary? traceContext = null); ValueTask EnsureExecutorAsync(string executorId, IStepTracer? tracer); } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ISuperStepRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ISuperStepRunner.cs index 6c317aa32d..7b892e9468 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ISuperStepRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/ISuperStepRunner.cs @@ -10,6 +10,8 @@ internal interface ISuperStepRunner { string RunId { get; } + string StartExecutorId { get; } + bool HasUnservicedRequests { get; } bool HasUnprocessedMessages { get; } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/InputEdgeRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/InputEdgeRunner.cs index 54a27baded..1174ac7ea2 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/InputEdgeRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/InputEdgeRunner.cs @@ -1,7 +1,9 @@ // Copyright (c) Microsoft. All rights reserved. +using System; using System.Diagnostics; using System.Threading.Tasks; +using Microsoft.Agents.AI.Workflows.Observability; using Microsoft.Shared.Diagnostics; namespace Microsoft.Agents.AI.Workflows.Execution; @@ -9,8 +11,6 @@ namespace Microsoft.Agents.AI.Workflows.Execution; internal sealed class InputEdgeRunner(IRunnerContext runContext, string sinkId) : EdgeRunner(runContext, sinkId) { - public IWorkflowContext WorkflowContext { get; } = runContext.Bind(sinkId); - public static InputEdgeRunner ForPort(IRunnerContext runContext, InputPort port) { Throw.IfNull(port); @@ -22,13 +22,30 @@ internal sealed class InputEdgeRunner(IRunnerContext runContext, string sinkId) protected internal override async ValueTask ChaseEdgeAsync(MessageEnvelope envelope, IStepTracer? stepTracer) { Debug.Assert(envelope.IsExternal, "Input edges should only be chased from external input"); - Executor target = await this.FindExecutorAsync(stepTracer).ConfigureAwait(false); - if (target.CanHandle(envelope.MessageType)) - { - return new DeliveryMapping(envelope, target); - } - return null; + using var activity = s_activitySource.StartActivity(ActivityNames.EdgeGroupProcess); + activity? + .SetTag(Tags.EdgeGroupType, nameof(InputEdgeRunner)) + .SetTag(Tags.MessageSourceId, envelope.SourceId) + .SetTag(Tags.MessageTargetId, this.EdgeData); + + try + { + Executor target = await this.FindExecutorAsync(stepTracer).ConfigureAwait(false); + if (target.CanHandle(envelope.MessageType)) + { + activity?.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.Delivered); + return new DeliveryMapping(envelope, target); + } + + activity?.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.DroppedTypeMismatch); + return null; + } + catch (Exception) when (activity is not null) + { + activity.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.Exception); + throw; + } } private async ValueTask FindExecutorAsync(IStepTracer? tracer) => await this.RunContext.EnsureExecutorAsync(this.EdgeData, tracer).ConfigureAwait(false); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/MessageEnvelope.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/MessageEnvelope.cs index 420175c943..d9e5665ec5 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/MessageEnvelope.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/MessageEnvelope.cs @@ -1,25 +1,37 @@ // Copyright (c) Microsoft. All rights reserved. using System; +using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using Microsoft.Agents.AI.Workflows.Checkpointing; namespace Microsoft.Agents.AI.Workflows.Execution; -internal sealed class MessageEnvelope(object message, ExecutorIdentity source, TypeId? declaredType = null, string? targetId = null) +internal sealed class MessageEnvelope( + object message, + ExecutorIdentity source, + TypeId? declaredType = null, + string? targetId = null, + Dictionary? traceContext = null) { public TypeId MessageType => declaredType ?? new(message.GetType()); public object Message => message; public ExecutorIdentity Source => source; public string? TargetId => targetId; + public Dictionary? TraceContext => traceContext; + [MemberNotNullWhen(false, nameof(SourceId))] public bool IsExternal => this.Source == ExecutorIdentity.None; public string? SourceId => this.Source.Id; - internal MessageEnvelope(object message, ExecutorIdentity source, Type declaredType, string? targetId = null) - : this(message, source, new TypeId(declaredType), targetId) + internal MessageEnvelope( + object message, + ExecutorIdentity source, + Type declaredType, + string? targetId = null, + Dictionary? traceContext = null) : this(message, source, new TypeId(declaredType), targetId, traceContext) { if (!declaredType.IsInstanceOfType(message)) { diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs index 65bc6e8732..e273b4b8a0 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Executor.cs @@ -8,6 +8,7 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.Agents.AI.Workflows.Checkpointing; using Microsoft.Agents.AI.Workflows.Execution; +using Microsoft.Agents.AI.Workflows.Observability; using Microsoft.Agents.AI.Workflows.Reflection; namespace Microsoft.Agents.AI.Workflows; @@ -25,6 +26,9 @@ public abstract class Executor : IIdentified private readonly ExecutorOptions _options; + private static readonly string s_namespace = typeof(Executor).Namespace!; + private static readonly ActivitySource s_activitySource = new(s_namespace); + /// /// Initialize the executor with a unique identifier /// @@ -88,6 +92,12 @@ public abstract class Executor : IIdentified /// An exception is generated while handling the message. public async ValueTask ExecuteAsync(object message, TypeId messageType, IWorkflowContext context) { + using var activity = s_activitySource.StartActivity(ActivityNames.ExecutorProcess, ActivityKind.Internal); + activity?.SetTag(Tags.ExecutorId, this.Id) + .SetTag(Tags.ExecutorType, this.GetType().FullName) + .SetTag(Tags.MessageType, messageType.TypeName) + .CreateSourceLinks(context.TraceContext); + await context.AddEventAsync(new ExecutorInvokedEvent(this.Id, message)).ConfigureAwait(false); CallResult? result = await this.Router.RouteMessageAsync(message, context, requireRoute: true) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/IWorkflowContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/IWorkflowContext.cs index 936bc337df..eed3b15d72 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/IWorkflowContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/IWorkflowContext.cs @@ -92,4 +92,9 @@ public interface IWorkflowContext /// An optional name that specifies the scope to clear. If null, the default scope is used. /// A ValueTask that represents the asynchronous clear operation. ValueTask QueueClearScopeAsync(string? scopeName = null); + + /// + /// The trace context associated with the current message about to be processed by the executor, if any. + /// + IReadOnlyDictionary? TraceContext { get; } } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs index e82292b753..bc0df57c36 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs @@ -23,6 +23,7 @@ internal sealed class InProcessRunner : ISuperStepRunner, ICheckpointingRunner public InProcessRunner(Workflow workflow, ICheckpointManager? checkpointManager, string? runId = null, params Type[] knownValidInputTypes) { this.RunId = runId ?? Guid.NewGuid().ToString("N"); + this.StartExecutorId = workflow.StartExecutorId; this.Workflow = Throw.IfNull(workflow); this.RunContext = new InProcessRunnerContext(workflow, this.RunId, this.StepTracer); @@ -38,6 +39,9 @@ internal sealed class InProcessRunner : ISuperStepRunner, ICheckpointingRunner /// public string RunId { get; } + /// + public string StartExecutorId { get; } + private readonly HashSet _knownValidInputTypes; public async ValueTask IsValidInputTypeAsync(Type messageType) { @@ -209,8 +213,11 @@ internal sealed class InProcessRunner : ISuperStepRunner, ICheckpointingRunner this.StepTracer.TraceActivated(receiverId); foreach (MessageEnvelope envelope in envelopes) { - await executor.ExecuteAsync(envelope.Message, envelope.MessageType, this.RunContext.Bind(receiverId)) - .ConfigureAwait(false); + await executor.ExecuteAsync( + envelope.Message, + envelope.MessageType, + this.RunContext.Bind(receiverId, envelope.TraceContext) + ).ConfigureAwait(false); } } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs index 49741bb85f..9743498700 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs @@ -3,15 +3,19 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.Agents.AI.Workflows.Checkpointing; using Microsoft.Agents.AI.Workflows.Execution; +using Microsoft.Agents.AI.Workflows.Observability; using Microsoft.Agents.AI.Workflows.Specialized; using Microsoft.Extensions.Logging; using Microsoft.Shared.Diagnostics; +using OpenTelemetry; +using OpenTelemetry.Context.Propagation; namespace Microsoft.Agents.AI.Workflows.InProc; @@ -132,10 +136,25 @@ internal sealed class InProcessRunnerContext : IRunnerContext return default; } + private static readonly string s_namespace = typeof(IWorkflowContext).Namespace!; + private static readonly ActivitySource s_activitySource = new(s_namespace); + public async ValueTask SendMessageAsync(string sourceId, object message, string? targetId = null) { + using Activity? activity = s_activitySource.StartActivity(ActivityNames.MessageSend, ActivityKind.Producer); + // Create a carrier for trace context propagation + var traceContext = activity is null ? null : new Dictionary(); + if (traceContext is not null) + { + // Inject the current activity context into the carrier + Propagators.DefaultTextMapPropagator.Inject( + new PropagationContext(activity?.Context ?? default, Baggage.Current), + traceContext, + (carrier, key, value) => carrier[key] = value); + } + this.CheckEnded(); - MessageEnvelope envelope = new(message, sourceId, targetId: targetId); + MessageEnvelope envelope = new(message, sourceId, targetId: targetId, traceContext: traceContext); if (this._workflow.Edges.TryGetValue(sourceId, out HashSet? edges)) { @@ -150,10 +169,10 @@ internal sealed class InProcessRunnerContext : IRunnerContext } } - public IWorkflowContext Bind(string executorId) + public IWorkflowContext Bind(string executorId, Dictionary? traceContext = null) { this.CheckEnded(); - return new BoundContext(this, executorId, this._outputFilter); + return new BoundContext(this, executorId, this._outputFilter, traceContext); } public ValueTask PostAsync(ExternalRequest request) @@ -173,10 +192,18 @@ internal sealed class InProcessRunnerContext : IRunnerContext internal StateManager StateManager { get; } = new(); - private sealed class BoundContext(InProcessRunnerContext RunnerContext, string ExecutorId, OutputFilter outputFilter) : IWorkflowContext + private sealed class BoundContext( + InProcessRunnerContext RunnerContext, + string ExecutorId, + OutputFilter outputFilter, + Dictionary? traceContext) : IWorkflowContext { public ValueTask AddEventAsync(WorkflowEvent workflowEvent) => RunnerContext.AddEventAsync(workflowEvent); - public ValueTask SendMessageAsync(object message, string? targetId = null) => RunnerContext.SendMessageAsync(ExecutorId, message, targetId); + + public ValueTask SendMessageAsync(object message, string? targetId = null) + { + return RunnerContext.SendMessageAsync(ExecutorId, message, targetId); + } public async ValueTask YieldOutputAsync(object output) { @@ -208,6 +235,8 @@ internal sealed class InProcessRunnerContext : IRunnerContext public ValueTask QueueClearScopeAsync(string? scopeName = null) => RunnerContext.StateManager.ClearStateAsync(ExecutorId, scopeName); + + public IReadOnlyDictionary? TraceContext => traceContext; } internal Task PrepareForCheckpointAsync(CancellationToken cancellationToken = default) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Microsoft.Agents.AI.Workflows.csproj b/dotnet/src/Microsoft.Agents.AI.Workflows/Microsoft.Agents.AI.Workflows.csproj index fa0a0ee4ec..e23beb40e1 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Microsoft.Agents.AI.Workflows.csproj +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Microsoft.Agents.AI.Workflows.csproj @@ -29,6 +29,10 @@ + + + + Microsoft Agent Framework Workflows diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/ActivityExtensions.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/ActivityExtensions.cs new file mode 100644 index 0000000000..e56f62bb46 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/ActivityExtensions.cs @@ -0,0 +1,55 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using OpenTelemetry.Context.Propagation; + +namespace Microsoft.Agents.AI.Workflows.Observability; + +internal static class ActivityExtensions +{ + /// + /// Capture exception details in the activity. + /// + /// The activity to capture exception details in. + /// The exception to capture. + /// + /// This method adds standard error tags to the activity and logs an event with exception details. + /// + internal static void CaptureException(this Activity? activity, Exception exception) + { + activity?.SetTag(Tags.ErrorType, exception.GetType().FullName) + .AddException(exception) + .SetStatus(ActivityStatusCode.Error, exception.Message); + } + + internal static void SetEdgeRunnerDeliveryStatus(this Activity? activity, EdgeRunnerDeliveryStatus status) + { + var delivered = status == EdgeRunnerDeliveryStatus.Delivered; + activity? + .SetTag(Tags.EdgeGroupDelivered, delivered) + .SetTag(Tags.EdgeGroupDeliveryStatus, status.ToStringValue()); + } + + /// + /// Executor processing spans are not nested, they are siblings. + /// We use links to represent the causal relationship between them. + /// + internal static void CreateSourceLinks(this Activity? activity, IReadOnlyDictionary? traceContext) + { + if (activity is null || traceContext is null) + { + return; + } + + // Extract the propagation context from the dictionary + var propagationContext = Propagators.DefaultTextMapPropagator.Extract( + default, + traceContext, + (carrier, key) => carrier.TryGetValue(key, out var value) ? [value] : Array.Empty()); + + // Create a link to the source activity + activity.AddLink(new ActivityLink(propagationContext.ActivityContext)); + } +} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/ActivityNames.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/ActivityNames.cs new file mode 100644 index 0000000000..a845915a96 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/ActivityNames.cs @@ -0,0 +1,12 @@ +// Copyright (c) Microsoft. All rights reserved. + +namespace Microsoft.Agents.AI.Workflows.Observability; + +internal static class ActivityNames +{ + public const string WorkflowBuild = "workflow.build"; + public const string WorkflowRun = "workflow.run"; + public const string MessageSend = "message.send"; + public const string ExecutorProcess = "executor.process"; + public const string EdgeGroupProcess = "edge_group.process"; +} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/EdgeRunnerDeliveryStatus.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/EdgeRunnerDeliveryStatus.cs new file mode 100644 index 0000000000..5e22f906e1 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/EdgeRunnerDeliveryStatus.cs @@ -0,0 +1,30 @@ +// Copyright (c) Microsoft. All rights reserved. + +namespace Microsoft.Agents.AI.Workflows.Observability; + +internal enum EdgeRunnerDeliveryStatus +{ + Delivered, + DroppedTypeMismatch, + DroppedTargetMismatch, + DroppedConditionFalse, + Exception, + Buffered +} + +internal static class EdgeRunnerDeliveryStatusExtensions +{ + public static string ToStringValue(this EdgeRunnerDeliveryStatus status) + { + return status switch + { + EdgeRunnerDeliveryStatus.Delivered => "delivered", + EdgeRunnerDeliveryStatus.DroppedTypeMismatch => "dropped type mismatch", + EdgeRunnerDeliveryStatus.DroppedTargetMismatch => "dropped target mismatch", + EdgeRunnerDeliveryStatus.DroppedConditionFalse => "dropped condition false", + EdgeRunnerDeliveryStatus.Exception => "exception", + EdgeRunnerDeliveryStatus.Buffered => "buffered", + _ => throw new System.NotImplementedException(), + }; + } +} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/EventNames.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/EventNames.cs new file mode 100644 index 0000000000..8b9f5bbde8 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/EventNames.cs @@ -0,0 +1,14 @@ +// Copyright (c) Microsoft. All rights reserved. + +namespace Microsoft.Agents.AI.Workflows.Observability; + +internal static class EventNames +{ + public const string BuildStarted = "build.started"; + public const string BuildValidationCompleted = "build.validation_completed"; + public const string BuildCompleted = "build.completed"; + public const string BuildError = "build.error"; + public const string WorkflowStarted = "workflow.started"; + public const string WorkflowCompleted = "workflow.completed"; + public const string WorkflowError = "workflow.error"; +} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/Tags.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/Tags.cs new file mode 100644 index 0000000000..81c0a3570c --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/Tags.cs @@ -0,0 +1,21 @@ +// Copyright (c) Microsoft. All rights reserved. + +namespace Microsoft.Agents.AI.Workflows.Observability; + +internal static class Tags +{ + public const string WorkflowId = "workflow.id"; + public const string WorkflowDefinition = "workflow.definition"; + public const string BuildErrorMessage = "build.error.message"; + public const string BuildErrorType = "build.error.type"; + public const string ErrorType = "error.type"; + public const string RunId = "run.id"; + public const string ExecutorId = "executor.id"; + public const string ExecutorType = "executor.type"; + public const string MessageType = "message.type"; + public const string EdgeGroupType = "edge_group.type"; + public const string MessageSourceId = "message.source_id"; + public const string MessageTargetId = "message.target_id"; + public const string EdgeGroupDelivered = "edge_group.delivered"; + public const string EdgeGroupDeliveryStatus = "edge_group.delivery_status"; +} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/StreamingRun.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/StreamingRun.cs index 5b325d8e8b..035f25bdfc 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/StreamingRun.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/StreamingRun.cs @@ -2,11 +2,13 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using Microsoft.Agents.AI.Workflows.Execution; +using Microsoft.Agents.AI.Workflows.Observability; using Microsoft.Shared.Diagnostics; namespace Microsoft.Agents.AI.Workflows; @@ -20,6 +22,9 @@ public sealed class StreamingRun private TaskCompletionSource? _waitForResponseSource; private readonly ISuperStepRunner _stepRunner; + private static readonly string s_namespace = typeof(StreamingRun).Namespace!; + private static readonly ActivitySource s_activitySource = new(s_namespace); + /// /// Gets a value indicating whether there are any outstanding s for which a /// has not been sent. @@ -95,12 +100,33 @@ public sealed class StreamingRun this._stepRunner.WorkflowEvent += OnWorkflowEvent; + using Activity? activity = s_activitySource.StartActivity(ActivityNames.WorkflowRun); + activity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId).SetTag(Tags.RunId, this.RunId); + try { + activity?.AddEvent(new ActivityEvent(EventNames.WorkflowStarted)); do { + // Because we may be yielding out of this function, we need to ensure that the Activity.Current + // is set to our activity for the duration of this loop iteration. + Activity.Current = activity; + // Drain SuperSteps while there are steps to run - await this._stepRunner.RunSuperStepAsync(cancellationToken).ConfigureAwait(false); + try + { + await this._stepRunner.RunSuperStepAsync(cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) when (activity is not null) + { + activity.AddEvent(new ActivityEvent(EventNames.WorkflowError, tags: new() { + { Tags.ErrorType, ex.GetType().FullName }, + { Tags.BuildErrorMessage, ex.Message }, + })); + activity.CaptureException(ex); + throw; + } + if (cancellationToken.IsCancellationRequested) { yield break; // Exit if cancellation is requested @@ -146,6 +172,8 @@ public sealed class StreamingRun this._waitForResponseSource = null; } } while (this._stepRunner.HasUnprocessedMessages); + + activity?.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted)); } finally { diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilder.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilder.cs index ca56e70527..cab8d83823 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilder.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowBuilder.cs @@ -2,9 +2,12 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; +using System.Text.Json; using System.Threading; using System.Threading.Tasks; +using Microsoft.Agents.AI.Workflows.Observability; using Microsoft.Shared.Diagnostics; namespace Microsoft.Agents.AI.Workflows; @@ -34,6 +37,9 @@ public class WorkflowBuilder private readonly string _startExecutorId; + private static readonly string s_namespace = typeof(WorkflowBuilder).Namespace!; + private static readonly ActivitySource s_activitySource = new(s_namespace); + /// /// Initializes a new instance of the WorkflowBuilder class with the specified starting executor. /// @@ -331,6 +337,57 @@ public class WorkflowBuilder // TODO: This is likely a pipe-dream, but can we do any type-checking on the edges? (Not without instantiating the executors...) } + private Workflow BuildInternal(Activity? activity = null) + { + activity?.AddEvent(new ActivityEvent(EventNames.BuildStarted)); + + try + { + this.Validate(); + } + catch (Exception ex) when (activity is not null) + { + activity.AddEvent(new ActivityEvent(EventNames.BuildError, tags: new() { + { Tags.BuildErrorMessage, ex.Message }, + { Tags.BuildErrorType, ex.GetType().FullName } + })); + activity.CaptureException(ex); + throw; + } + + activity?.AddEvent(new ActivityEvent(EventNames.BuildValidationCompleted)); + + var workflow = new Workflow(this._startExecutorId) + { + Registrations = this._executors, + Edges = this._edges, + Ports = this._inputPorts, + OutputExecutors = this._outputExecutors + }; + + // Using the start executor ID as a proxy for the workflow ID + activity?.SetTag(Tags.WorkflowId, workflow.StartExecutorId); + if (activity is not null) + { + var workflowJsonDefinitionData = new WorkflowJsonDefinitionData + { + StartExecutorId = this._startExecutorId, + Edges = this._edges.Values.SelectMany(e => e), + Ports = this._inputPorts.Values, + OutputExecutors = this._outputExecutors + }; + activity.SetTag( + Tags.WorkflowDefinition, + JsonSerializer.Serialize( + workflowJsonDefinitionData, + WorkflowJsonDefinitionJsonContext.Default.WorkflowJsonDefinitionData + ) + ); + } + + return workflow; + } + /// /// Builds and returns a workflow instance. /// @@ -338,15 +395,13 @@ public class WorkflowBuilder /// or if the start executor is not bound. public Workflow Build() { - this.Validate(); + using Activity? activity = s_activitySource.StartActivity(ActivityNames.WorkflowBuild); - return new Workflow(this._startExecutorId) - { - Registrations = this._executors, - Edges = this._edges, - Ports = this._inputPorts, - OutputExecutors = this._outputExecutors - }; + var workflow = this.BuildInternal(activity); + + activity?.AddEvent(new ActivityEvent(EventNames.BuildCompleted)); + + return workflow; } /// @@ -356,16 +411,26 @@ public class WorkflowBuilder /// Thrown if the built workflow cannot process messages of the specified input type, public async ValueTask> BuildAsync() where TInput : notnull { - Workflow? maybeWorkflow = await this.Build() + using Activity? activity = s_activitySource.StartActivity(ActivityNames.WorkflowBuild); + + Workflow? maybeWorkflow = await this.BuildInternal(activity) .TryPromoteAsync() .ConfigureAwait(false); if (maybeWorkflow is null) { - throw new InvalidOperationException( + var exception = new InvalidOperationException( $"The built workflow cannot process input of type '{typeof(TInput).FullName}'."); + activity?.AddEvent(new ActivityEvent(EventNames.BuildError, tags: new() { + { Tags.BuildErrorMessage, exception.Message }, + { Tags.BuildErrorType, exception.GetType().FullName } + })); + activity?.CaptureException(exception); + throw exception; } + activity?.AddEvent(new ActivityEvent(EventNames.BuildCompleted)); + return maybeWorkflow; } } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowJsonDefintion.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowJsonDefintion.cs new file mode 100644 index 0000000000..5adc952580 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowJsonDefintion.cs @@ -0,0 +1,20 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Collections.Generic; +using System.Text.Json.Serialization; + +namespace Microsoft.Agents.AI.Workflows; + +[JsonSourceGenerationOptions(UseStringEnumConverter = true)] +[JsonSerializable(typeof(WorkflowJsonDefinitionData))] +internal partial class WorkflowJsonDefinitionJsonContext : JsonSerializerContext +{ +} + +internal class WorkflowJsonDefinitionData +{ + public string StartExecutorId { get; set; } = string.Empty; + public IEnumerable Edges { get; set; } = []; + public IEnumerable Ports { get; set; } = []; + public IEnumerable OutputExecutors { get; set; } = []; +} diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SpecializedExecutorSmokeTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SpecializedExecutorSmokeTests.cs index 75a8bd6af0..6b883ebd1e 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SpecializedExecutorSmokeTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SpecializedExecutorSmokeTests.cs @@ -149,6 +149,8 @@ public class SpecializedExecutorSmokeTests return default; } + + public IReadOnlyDictionary? TraceContext => null; } [Fact] diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/TestRunContext.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/TestRunContext.cs index c182a2773a..4dfeb595f0 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/TestRunContext.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/TestRunContext.cs @@ -9,7 +9,10 @@ namespace Microsoft.Agents.AI.Workflows.UnitTests; public class TestRunContext : IRunnerContext { - private sealed class BoundContext(string executorId, TestRunContext runnerContext) : IWorkflowContext + private sealed class BoundContext( + string executorId, + TestRunContext runnerContext, + IReadOnlyDictionary? traceContext) : IWorkflowContext { public ValueTask AddEventAsync(WorkflowEvent workflowEvent) => runnerContext.AddEventAsync(workflowEvent); @@ -34,6 +37,8 @@ public class TestRunContext : IRunnerContext public ValueTask SendMessageAsync(object message, string? targetId = null) => runnerContext.SendMessageAsync(executorId, message, targetId); + + public IReadOnlyDictionary? TraceContext => traceContext; } public List Events { get; } = []; @@ -44,7 +49,8 @@ public class TestRunContext : IRunnerContext return default; } - public IWorkflowContext Bind(string executorId) => new BoundContext(executorId, this); + public IWorkflowContext Bind(string executorId, Dictionary? traceContext = null) + => new BoundContext(executorId, this, traceContext); public List ExternalRequests { get; } = []; public ValueTask PostAsync(ExternalRequest request)