.NET: Workflow observability (#959)

* Add basic Workflow telemetry

* Add sample

* Add source propagation for executor spans

* Fix tests and address comments

* Fix formatting

* Fix declarative unit tests

* Address comments

* Remove Microsoft.Extensions.AI.Agents.EnableTelemetry

* Formatting

* Formatting

* Formatting

* fix solution

* Address comments

* Add workflow json definition for serialization

* Formmating

* Address comments
This commit is contained in:
Tao Chen
2025-09-30 16:40:04 -07:00
committed by GitHub
Unverified
parent 2539282d30
commit 042099009f
29 changed files with 600 additions and 74 deletions
@@ -139,14 +139,6 @@ using var telemetryAgent = baseAgent.WithOpenTelemetry();
var response = await telemetryAgent.RunAsync(messages);
```
### Integration with AppContext Switch
The implementation integrates with the standard .NET telemetry enablement pattern:
```csharp
AppContext.SetSwitch("Microsoft.Extensions.AI.Agents.EnableTelemetry", true);
```
### Relationship to Microsoft.Extensions.AI
This implementation follows the exact patterns established by Microsoft.Extensions.AI's OpenTelemetry instrumentation, ensuring consistency across the AI ecosystem and leveraging proven patterns for telemetry integration.
+1
View File
@@ -24,6 +24,7 @@
<PackageVersion Include="Microsoft.Extensions.AI.AzureAIInference" Version="9.9.1-preview.1.25474.6" />
<PackageVersion Include="Microsoft.Extensions.Http.Resilience" Version="9.9.0" />
<PackageVersion Include="Microsoft.Extensions.ServiceDiscovery" Version="$(AspireAppHostSdkVersion)" />
<PackageVersion Include="OpenTelemetry.Api" Version="1.12.0" />
<PackageVersion Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.12.0" />
<PackageVersion Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.12.0" />
<PackageVersion Include="OpenTelemetry.Instrumentation.Http" Version="1.12.0" />
+3
View File
@@ -107,6 +107,9 @@
<Folder Name="/Samples/GettingStarted/Workflows/HumanInTheLoop/">
<Project Path="samples/GettingStarted/Workflows/HumanInTheLoop/HumanInTheLoopBasic/HumanInTheLoopBasic.csproj" />
</Folder>
<Folder Name="/Samples/GettingStarted/Workflows/Observability/">
<Project Path="samples/GettingStarted/Workflows/Observability/AspireDashboard/AspireDashboard.csproj" />
</Folder>
<Folder Name="/Samples/GettingStarted/Workflows/Visualization/">
<Project Path="samples/GettingStarted/Workflows/Visualization/Visualization.csproj" Id="99bf0bc6-2440-428e-b3e7-d880e4b7a5fd" />
</Folder>
@@ -0,0 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net9.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>disable</ImplicitUsings>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="OpenTelemetry" />
<PackageReference Include="OpenTelemetry.Exporter.Console" />
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\..\..\src\Microsoft.Agents.AI.Workflows\Microsoft.Agents.AI.Workflows.csproj" />
</ItemGroup>
</Project>
@@ -0,0 +1,101 @@
// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Reflection;
using OpenTelemetry;
using OpenTelemetry.Logs;
using OpenTelemetry.Metrics;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
namespace WorkflowObservabilitySample;
/// <summary>
/// This sample shows how to enable observability in a workflow and send the traces
/// to be visualized in Aspire Dashboard.
///
/// In this example, we create a simple text processing pipeline that:
/// 1. Takes input text and converts it to uppercase using an UppercaseExecutor
/// 2. Takes the uppercase text and reverses it using a ReverseTextExecutor
///
/// The executors are connected sequentially, so data flows from one to the next in order.
/// For input "Hello, World!", the workflow produces "!DLROW ,OLLEH".
/// </summary>
public static class Program
{
private const string SourceName = "Workflow.Sample";
private static readonly ActivitySource s_activitySource = new(SourceName);
private static async Task Main()
{
// Configure OpenTelemetry for Aspire dashboard
var otlpEndpoint = Environment.GetEnvironmentVariable("OTLP_ENDPOINT") ?? "http://localhost:4317";
var resourceBuilder = ResourceBuilder
.CreateDefault()
.AddService("WorkflowSample");
using var traceProvider = Sdk.CreateTracerProviderBuilder()
.SetResourceBuilder(resourceBuilder)
.AddSource("Microsoft.Agents.AI.Workflows*")
.AddSource(SourceName)
.AddOtlpExporter(options => options.Endpoint = new Uri(otlpEndpoint))
.Build();
// Start a root activity for the application
using var activity = s_activitySource.StartActivity("main");
Console.WriteLine($"Operation/Trace ID: {Activity.Current?.TraceId}");
// Create the executors
UppercaseExecutor uppercase = new();
ReverseTextExecutor reverse = new();
// Build the workflow by connecting executors sequentially
var workflow = new WorkflowBuilder(uppercase)
.AddEdge(uppercase, reverse)
.Build();
// Execute the workflow with input data
Run run = await InProcessExecution.RunAsync(workflow, "Hello, World!");
foreach (WorkflowEvent evt in run.NewEvents)
{
if (evt is ExecutorCompletedEvent executorComplete)
{
Console.WriteLine($"{executorComplete.ExecutorId}: {executorComplete.Data}");
}
}
}
}
/// <summary>
/// First executor: converts input text to uppercase.
/// </summary>
internal sealed class UppercaseExecutor() : ReflectingExecutor<UppercaseExecutor>("UppercaseExecutor"), IMessageHandler<string, string>
{
/// <summary>
/// Processes the input message by converting it to uppercase.
/// </summary>
/// <param name="message">The input text to convert</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <returns>The input text converted to uppercase</returns>
public async ValueTask<string> HandleAsync(string message, IWorkflowContext context) =>
message.ToUpperInvariant(); // The return value will be sent as a message along an edge to subsequent executors
}
/// <summary>
/// Second executor: reverses the input text and completes the workflow.
/// </summary>
internal sealed class ReverseTextExecutor() : ReflectingExecutor<ReverseTextExecutor>("ReverseTextExecutor"), IMessageHandler<string, string>
{
/// <summary>
/// Processes the input message by reversing the text.
/// </summary>
/// <param name="message">The input text to reverse</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <returns>The input text reversed</returns>
public async ValueTask<string> HandleAsync(string message, IWorkflowContext context) => new string(message.Reverse().ToArray());
}
@@ -29,6 +29,7 @@ internal sealed class DeclarativeWorkflowContext : IWorkflowContext
private IWorkflowContext Source { get; }
public WorkflowFormulaState State { get; }
public IReadOnlyDictionary<string, string>? TraceContext => this.Source.TraceContext;
/// <inheritdoc/>
public ValueTask AddEventAsync(WorkflowEvent workflowEvent) => this.Source.AddEventAsync(workflowEvent);
@@ -1,36 +1,54 @@
// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows.Observability;
namespace Microsoft.Agents.AI.Workflows.Execution;
internal sealed class DirectEdgeRunner(IRunnerContext runContext, DirectEdgeData edgeData) :
EdgeRunner<DirectEdgeData>(runContext, edgeData)
{
public IWorkflowContext WorkflowContext { get; } = runContext.Bind(edgeData.SinkId);
private async ValueTask<Executor> FindRouterAsync(IStepTracer? tracer) => await this.RunContext.EnsureExecutorAsync(this.EdgeData.SinkId, tracer)
.ConfigureAwait(false);
protected internal override async ValueTask<DeliveryMapping?> ChaseEdgeAsync(MessageEnvelope envelope, IStepTracer? stepTracer)
{
using var activity = s_activitySource.StartActivity(ActivityNames.EdgeGroupProcess);
activity?
.SetTag(Tags.EdgeGroupType, nameof(DirectEdgeRunner))
.SetTag(Tags.MessageSourceId, this.EdgeData.SourceId)
.SetTag(Tags.MessageTargetId, this.EdgeData.SinkId);
if (envelope.TargetId is not null && this.EdgeData.SinkId != envelope.TargetId)
{
activity?.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.DroppedTargetMismatch);
return null;
}
object message = envelope.Message;
if (this.EdgeData.Condition is not null && !this.EdgeData.Condition(message))
try
{
return null;
}
Executor target = await this.FindRouterAsync(stepTracer).ConfigureAwait(false);
if (target.CanHandle(envelope.MessageType))
{
return new DeliveryMapping(envelope, target);
if (this.EdgeData.Condition is not null && !this.EdgeData.Condition(message))
{
activity?.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.DroppedConditionFalse);
return null;
}
Executor target = await this.FindRouterAsync(stepTracer).ConfigureAwait(false);
if (target.CanHandle(envelope.MessageType))
{
activity?.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.Delivered);
return new DeliveryMapping(envelope, target);
}
}
catch (Exception) when (activity is not null)
{
activity.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.Exception);
throw;
}
activity?.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.DroppedTypeMismatch);
return null;
}
}
@@ -1,5 +1,6 @@
// Copyright (c) Microsoft. All rights reserved.
using System.Diagnostics;
using System.Threading.Tasks;
using Microsoft.Shared.Diagnostics;
@@ -13,6 +14,9 @@ internal interface IStatefulEdgeRunner
internal abstract class EdgeRunner
{
protected static readonly string s_namespace = typeof(EdgeRunner).Namespace!;
protected static readonly ActivitySource s_activitySource = new(s_namespace);
// TODO: Can this be sync?
protected internal abstract ValueTask<DeliveryMapping?> ChaseEdgeAsync(MessageEnvelope envelope, IStepTracer? stepTracer);
}
@@ -5,6 +5,7 @@ using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows.Observability;
namespace Microsoft.Agents.AI.Workflows.Execution;
@@ -18,8 +19,14 @@ internal sealed class FanInEdgeRunner(IRunnerContext runContext, FanInEdgeData e
{
Debug.Assert(!envelope.IsExternal, "FanIn edges should never be chased from external input");
using var activity = s_activitySource.StartActivity(ActivityNames.EdgeGroupProcess);
activity?
.SetTag(Tags.EdgeGroupType, nameof(FanInEdgeRunner))
.SetTag(Tags.MessageTargetId, this.EdgeData.SinkId);
if (envelope.TargetId is not null && this.EdgeData.SinkId != envelope.TargetId)
{
activity?.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.DroppedTargetMismatch);
return null;
}
@@ -28,14 +35,30 @@ internal sealed class FanInEdgeRunner(IRunnerContext runContext, FanInEdgeData e
if (releasedMessages is null)
{
// Not ready to process yet.
activity?.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.Buffered);
return null;
}
// TODO: Filter messages based on accepted input types?
Executor target = await this.RunContext.EnsureExecutorAsync(this.EdgeData.SinkId, stepTracer)
.ConfigureAwait(false);
try
{
// TODO: Filter messages based on accepted input types?
Executor target = await this.RunContext.EnsureExecutorAsync(this.EdgeData.SinkId, stepTracer)
.ConfigureAwait(false);
// Materialize the filtered list via ToList() to avoid multiple enumerations
var finalReleasedMessages = releasedMessages.Where(envelope => target.CanHandle(envelope.MessageType)).ToList();
if (finalReleasedMessages.Count == 0)
{
activity?.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.DroppedTypeMismatch);
return null;
}
return new DeliveryMapping(releasedMessages.Where(envelope => target.CanHandle(envelope.MessageType)), target);
return new DeliveryMapping(finalReleasedMessages, target);
}
catch (Exception) when (activity is not null)
{
activity.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.Exception);
throw;
}
}
public ValueTask<PortableValue> ExportStateAsync()
@@ -1,40 +1,61 @@
// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows.Observability;
namespace Microsoft.Agents.AI.Workflows.Execution;
internal sealed class FanOutEdgeRunner(IRunnerContext runContext, FanOutEdgeData edgeData) :
EdgeRunner<FanOutEdgeData>(runContext, edgeData)
{
private Dictionary<string, IWorkflowContext> BoundContexts { get; }
= edgeData.SinkIds.ToDictionary(
sinkId => sinkId,
runContext.Bind);
protected internal override async ValueTask<DeliveryMapping?> ChaseEdgeAsync(MessageEnvelope envelope, IStepTracer? stepTracer)
{
using var activity = s_activitySource.StartActivity(ActivityNames.EdgeGroupProcess);
activity?
.SetTag(Tags.EdgeGroupType, nameof(FanOutEdgeRunner))
.SetTag(Tags.MessageSourceId, this.EdgeData.SourceId);
object message = envelope.Message;
IEnumerable<string> targetIds =
this.EdgeData.EdgeAssigner is null
? this.EdgeData.SinkIds
: this.EdgeData.EdgeAssigner(message, this.BoundContexts.Count)
.Select(i => this.EdgeData.SinkIds[i]);
Executor[] result = await Task.WhenAll(targetIds.Where(IsValidTarget)
.Select(tid => this.RunContext.EnsureExecutorAsync(tid, stepTracer)
.AsTask()))
.ConfigureAwait(false);
if (result.Length == 0)
try
{
return null;
}
IEnumerable<string> targetIds =
this.EdgeData.EdgeAssigner is null
? this.EdgeData.SinkIds
: this.EdgeData.EdgeAssigner(message, this.EdgeData.SinkIds.Count)
.Select(i => this.EdgeData.SinkIds[i]);
IEnumerable<Executor> validTargets = result.Where(t => t.CanHandle(envelope.MessageType));
return new DeliveryMapping(envelope, validTargets);
Executor[] result = await Task.WhenAll(targetIds.Where(IsValidTarget)
.Select(tid => this.RunContext.EnsureExecutorAsync(tid, stepTracer)
.AsTask()))
.ConfigureAwait(false);
if (result.Length == 0)
{
activity?.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.DroppedTargetMismatch);
return null;
}
IEnumerable<Executor> validTargets = result.Where(t => t.CanHandle(envelope.MessageType));
if (!validTargets.Any())
{
activity?.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.DroppedTypeMismatch);
return null;
}
activity?.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.Delivered);
return new DeliveryMapping(envelope, validTargets);
}
catch (Exception) when (activity is not null)
{
activity.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.Exception);
throw;
}
bool IsValidTarget(string targetId)
{
@@ -1,5 +1,6 @@
// Copyright (c) Microsoft. All rights reserved.
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Microsoft.Agents.AI.Workflows.Execution;
@@ -10,6 +11,6 @@ internal interface IRunnerContext : IExternalRequestSink
ValueTask SendMessageAsync(string sourceId, object message, string? targetId = null);
ValueTask<StepContext> AdvanceAsync();
IWorkflowContext Bind(string executorId);
IWorkflowContext Bind(string executorId, Dictionary<string, string>? traceContext = null);
ValueTask<Executor> EnsureExecutorAsync(string executorId, IStepTracer? tracer);
}
@@ -10,6 +10,8 @@ internal interface ISuperStepRunner
{
string RunId { get; }
string StartExecutorId { get; }
bool HasUnservicedRequests { get; }
bool HasUnprocessedMessages { get; }
@@ -1,7 +1,9 @@
// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows.Observability;
using Microsoft.Shared.Diagnostics;
namespace Microsoft.Agents.AI.Workflows.Execution;
@@ -9,8 +11,6 @@ namespace Microsoft.Agents.AI.Workflows.Execution;
internal sealed class InputEdgeRunner(IRunnerContext runContext, string sinkId)
: EdgeRunner<string>(runContext, sinkId)
{
public IWorkflowContext WorkflowContext { get; } = runContext.Bind(sinkId);
public static InputEdgeRunner ForPort(IRunnerContext runContext, InputPort port)
{
Throw.IfNull(port);
@@ -22,13 +22,30 @@ internal sealed class InputEdgeRunner(IRunnerContext runContext, string sinkId)
protected internal override async ValueTask<DeliveryMapping?> ChaseEdgeAsync(MessageEnvelope envelope, IStepTracer? stepTracer)
{
Debug.Assert(envelope.IsExternal, "Input edges should only be chased from external input");
Executor target = await this.FindExecutorAsync(stepTracer).ConfigureAwait(false);
if (target.CanHandle(envelope.MessageType))
{
return new DeliveryMapping(envelope, target);
}
return null;
using var activity = s_activitySource.StartActivity(ActivityNames.EdgeGroupProcess);
activity?
.SetTag(Tags.EdgeGroupType, nameof(InputEdgeRunner))
.SetTag(Tags.MessageSourceId, envelope.SourceId)
.SetTag(Tags.MessageTargetId, this.EdgeData);
try
{
Executor target = await this.FindExecutorAsync(stepTracer).ConfigureAwait(false);
if (target.CanHandle(envelope.MessageType))
{
activity?.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.Delivered);
return new DeliveryMapping(envelope, target);
}
activity?.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.DroppedTypeMismatch);
return null;
}
catch (Exception) when (activity is not null)
{
activity.SetEdgeRunnerDeliveryStatus(EdgeRunnerDeliveryStatus.Exception);
throw;
}
}
private async ValueTask<Executor> FindExecutorAsync(IStepTracer? tracer) => await this.RunContext.EnsureExecutorAsync(this.EdgeData, tracer).ConfigureAwait(false);
@@ -1,25 +1,37 @@
// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using Microsoft.Agents.AI.Workflows.Checkpointing;
namespace Microsoft.Agents.AI.Workflows.Execution;
internal sealed class MessageEnvelope(object message, ExecutorIdentity source, TypeId? declaredType = null, string? targetId = null)
internal sealed class MessageEnvelope(
object message,
ExecutorIdentity source,
TypeId? declaredType = null,
string? targetId = null,
Dictionary<string, string>? traceContext = null)
{
public TypeId MessageType => declaredType ?? new(message.GetType());
public object Message => message;
public ExecutorIdentity Source => source;
public string? TargetId => targetId;
public Dictionary<string, string>? TraceContext => traceContext;
[MemberNotNullWhen(false, nameof(SourceId))]
public bool IsExternal => this.Source == ExecutorIdentity.None;
public string? SourceId => this.Source.Id;
internal MessageEnvelope(object message, ExecutorIdentity source, Type declaredType, string? targetId = null)
: this(message, source, new TypeId(declaredType), targetId)
internal MessageEnvelope(
object message,
ExecutorIdentity source,
Type declaredType,
string? targetId = null,
Dictionary<string, string>? traceContext = null) : this(message, source, new TypeId(declaredType), targetId, traceContext)
{
if (!declaredType.IsInstanceOfType(message))
{
@@ -8,6 +8,7 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows.Checkpointing;
using Microsoft.Agents.AI.Workflows.Execution;
using Microsoft.Agents.AI.Workflows.Observability;
using Microsoft.Agents.AI.Workflows.Reflection;
namespace Microsoft.Agents.AI.Workflows;
@@ -25,6 +26,9 @@ public abstract class Executor : IIdentified
private readonly ExecutorOptions _options;
private static readonly string s_namespace = typeof(Executor).Namespace!;
private static readonly ActivitySource s_activitySource = new(s_namespace);
/// <summary>
/// Initialize the executor with a unique identifier
/// </summary>
@@ -88,6 +92,12 @@ public abstract class Executor : IIdentified
/// <exception cref="TargetInvocationException">An exception is generated while handling the message.</exception>
public async ValueTask<object?> ExecuteAsync(object message, TypeId messageType, IWorkflowContext context)
{
using var activity = s_activitySource.StartActivity(ActivityNames.ExecutorProcess, ActivityKind.Internal);
activity?.SetTag(Tags.ExecutorId, this.Id)
.SetTag(Tags.ExecutorType, this.GetType().FullName)
.SetTag(Tags.MessageType, messageType.TypeName)
.CreateSourceLinks(context.TraceContext);
await context.AddEventAsync(new ExecutorInvokedEvent(this.Id, message)).ConfigureAwait(false);
CallResult? result = await this.Router.RouteMessageAsync(message, context, requireRoute: true)
@@ -92,4 +92,9 @@ public interface IWorkflowContext
/// <param name="scopeName">An optional name that specifies the scope to clear. If null, the default scope is used.</param>
/// <returns>A ValueTask that represents the asynchronous clear operation.</returns>
ValueTask QueueClearScopeAsync(string? scopeName = null);
/// <summary>
/// The trace context associated with the current message about to be processed by the executor, if any.
/// </summary>
IReadOnlyDictionary<string, string>? TraceContext { get; }
}
@@ -23,6 +23,7 @@ internal sealed class InProcessRunner : ISuperStepRunner, ICheckpointingRunner
public InProcessRunner(Workflow workflow, ICheckpointManager? checkpointManager, string? runId = null, params Type[] knownValidInputTypes)
{
this.RunId = runId ?? Guid.NewGuid().ToString("N");
this.StartExecutorId = workflow.StartExecutorId;
this.Workflow = Throw.IfNull(workflow);
this.RunContext = new InProcessRunnerContext(workflow, this.RunId, this.StepTracer);
@@ -38,6 +39,9 @@ internal sealed class InProcessRunner : ISuperStepRunner, ICheckpointingRunner
/// <inheritdoc cref="ISuperStepRunner.RunId"/>
public string RunId { get; }
/// <inheritdoc cref="ISuperStepRunner.StartExecutorId"/>
public string StartExecutorId { get; }
private readonly HashSet<Type> _knownValidInputTypes;
public async ValueTask<bool> IsValidInputTypeAsync(Type messageType)
{
@@ -209,8 +213,11 @@ internal sealed class InProcessRunner : ISuperStepRunner, ICheckpointingRunner
this.StepTracer.TraceActivated(receiverId);
foreach (MessageEnvelope envelope in envelopes)
{
await executor.ExecuteAsync(envelope.Message, envelope.MessageType, this.RunContext.Bind(receiverId))
.ConfigureAwait(false);
await executor.ExecuteAsync(
envelope.Message,
envelope.MessageType,
this.RunContext.Bind(receiverId, envelope.TraceContext)
).ConfigureAwait(false);
}
}
@@ -3,15 +3,19 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows.Checkpointing;
using Microsoft.Agents.AI.Workflows.Execution;
using Microsoft.Agents.AI.Workflows.Observability;
using Microsoft.Agents.AI.Workflows.Specialized;
using Microsoft.Extensions.Logging;
using Microsoft.Shared.Diagnostics;
using OpenTelemetry;
using OpenTelemetry.Context.Propagation;
namespace Microsoft.Agents.AI.Workflows.InProc;
@@ -132,10 +136,25 @@ internal sealed class InProcessRunnerContext : IRunnerContext
return default;
}
private static readonly string s_namespace = typeof(IWorkflowContext).Namespace!;
private static readonly ActivitySource s_activitySource = new(s_namespace);
public async ValueTask SendMessageAsync(string sourceId, object message, string? targetId = null)
{
using Activity? activity = s_activitySource.StartActivity(ActivityNames.MessageSend, ActivityKind.Producer);
// Create a carrier for trace context propagation
var traceContext = activity is null ? null : new Dictionary<string, string>();
if (traceContext is not null)
{
// Inject the current activity context into the carrier
Propagators.DefaultTextMapPropagator.Inject(
new PropagationContext(activity?.Context ?? default, Baggage.Current),
traceContext,
(carrier, key, value) => carrier[key] = value);
}
this.CheckEnded();
MessageEnvelope envelope = new(message, sourceId, targetId: targetId);
MessageEnvelope envelope = new(message, sourceId, targetId: targetId, traceContext: traceContext);
if (this._workflow.Edges.TryGetValue(sourceId, out HashSet<Edge>? edges))
{
@@ -150,10 +169,10 @@ internal sealed class InProcessRunnerContext : IRunnerContext
}
}
public IWorkflowContext Bind(string executorId)
public IWorkflowContext Bind(string executorId, Dictionary<string, string>? traceContext = null)
{
this.CheckEnded();
return new BoundContext(this, executorId, this._outputFilter);
return new BoundContext(this, executorId, this._outputFilter, traceContext);
}
public ValueTask PostAsync(ExternalRequest request)
@@ -173,10 +192,18 @@ internal sealed class InProcessRunnerContext : IRunnerContext
internal StateManager StateManager { get; } = new();
private sealed class BoundContext(InProcessRunnerContext RunnerContext, string ExecutorId, OutputFilter outputFilter) : IWorkflowContext
private sealed class BoundContext(
InProcessRunnerContext RunnerContext,
string ExecutorId,
OutputFilter outputFilter,
Dictionary<string, string>? traceContext) : IWorkflowContext
{
public ValueTask AddEventAsync(WorkflowEvent workflowEvent) => RunnerContext.AddEventAsync(workflowEvent);
public ValueTask SendMessageAsync(object message, string? targetId = null) => RunnerContext.SendMessageAsync(ExecutorId, message, targetId);
public ValueTask SendMessageAsync(object message, string? targetId = null)
{
return RunnerContext.SendMessageAsync(ExecutorId, message, targetId);
}
public async ValueTask YieldOutputAsync(object output)
{
@@ -208,6 +235,8 @@ internal sealed class InProcessRunnerContext : IRunnerContext
public ValueTask QueueClearScopeAsync(string? scopeName = null)
=> RunnerContext.StateManager.ClearStateAsync(ExecutorId, scopeName);
public IReadOnlyDictionary<string, string>? TraceContext => traceContext;
}
internal Task PrepareForCheckpointAsync(CancellationToken cancellationToken = default)
@@ -29,6 +29,10 @@
<InternalsVisibleTo Include="Microsoft.Agents.AI.Workflows.UnitTests" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="OpenTelemetry.Api" />
</ItemGroup>
<PropertyGroup>
<!-- NuGet Package Settings -->
<Title>Microsoft Agent Framework Workflows</Title>
@@ -0,0 +1,55 @@
// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using OpenTelemetry.Context.Propagation;
namespace Microsoft.Agents.AI.Workflows.Observability;
internal static class ActivityExtensions
{
/// <summary>
/// Capture exception details in the activity.
/// </summary>
/// <param name="activity">The activity to capture exception details in.</param>
/// <param name="exception">The exception to capture.</param>
/// <remarks>
/// This method adds standard error tags to the activity and logs an event with exception details.
/// </remarks>
internal static void CaptureException(this Activity? activity, Exception exception)
{
activity?.SetTag(Tags.ErrorType, exception.GetType().FullName)
.AddException(exception)
.SetStatus(ActivityStatusCode.Error, exception.Message);
}
internal static void SetEdgeRunnerDeliveryStatus(this Activity? activity, EdgeRunnerDeliveryStatus status)
{
var delivered = status == EdgeRunnerDeliveryStatus.Delivered;
activity?
.SetTag(Tags.EdgeGroupDelivered, delivered)
.SetTag(Tags.EdgeGroupDeliveryStatus, status.ToStringValue());
}
/// <summary>
/// Executor processing spans are not nested, they are siblings.
/// We use links to represent the causal relationship between them.
/// </summary>
internal static void CreateSourceLinks(this Activity? activity, IReadOnlyDictionary<string, string>? traceContext)
{
if (activity is null || traceContext is null)
{
return;
}
// Extract the propagation context from the dictionary
var propagationContext = Propagators.DefaultTextMapPropagator.Extract(
default,
traceContext,
(carrier, key) => carrier.TryGetValue(key, out var value) ? [value] : Array.Empty<string>());
// Create a link to the source activity
activity.AddLink(new ActivityLink(propagationContext.ActivityContext));
}
}
@@ -0,0 +1,12 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Agents.AI.Workflows.Observability;
internal static class ActivityNames
{
public const string WorkflowBuild = "workflow.build";
public const string WorkflowRun = "workflow.run";
public const string MessageSend = "message.send";
public const string ExecutorProcess = "executor.process";
public const string EdgeGroupProcess = "edge_group.process";
}
@@ -0,0 +1,30 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Agents.AI.Workflows.Observability;
internal enum EdgeRunnerDeliveryStatus
{
Delivered,
DroppedTypeMismatch,
DroppedTargetMismatch,
DroppedConditionFalse,
Exception,
Buffered
}
internal static class EdgeRunnerDeliveryStatusExtensions
{
public static string ToStringValue(this EdgeRunnerDeliveryStatus status)
{
return status switch
{
EdgeRunnerDeliveryStatus.Delivered => "delivered",
EdgeRunnerDeliveryStatus.DroppedTypeMismatch => "dropped type mismatch",
EdgeRunnerDeliveryStatus.DroppedTargetMismatch => "dropped target mismatch",
EdgeRunnerDeliveryStatus.DroppedConditionFalse => "dropped condition false",
EdgeRunnerDeliveryStatus.Exception => "exception",
EdgeRunnerDeliveryStatus.Buffered => "buffered",
_ => throw new System.NotImplementedException(),
};
}
}
@@ -0,0 +1,14 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Agents.AI.Workflows.Observability;
internal static class EventNames
{
public const string BuildStarted = "build.started";
public const string BuildValidationCompleted = "build.validation_completed";
public const string BuildCompleted = "build.completed";
public const string BuildError = "build.error";
public const string WorkflowStarted = "workflow.started";
public const string WorkflowCompleted = "workflow.completed";
public const string WorkflowError = "workflow.error";
}
@@ -0,0 +1,21 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Agents.AI.Workflows.Observability;
internal static class Tags
{
public const string WorkflowId = "workflow.id";
public const string WorkflowDefinition = "workflow.definition";
public const string BuildErrorMessage = "build.error.message";
public const string BuildErrorType = "build.error.type";
public const string ErrorType = "error.type";
public const string RunId = "run.id";
public const string ExecutorId = "executor.id";
public const string ExecutorType = "executor.type";
public const string MessageType = "message.type";
public const string EdgeGroupType = "edge_group.type";
public const string MessageSourceId = "message.source_id";
public const string MessageTargetId = "message.target_id";
public const string EdgeGroupDelivered = "edge_group.delivered";
public const string EdgeGroupDeliveryStatus = "edge_group.delivery_status";
}
@@ -2,11 +2,13 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows.Execution;
using Microsoft.Agents.AI.Workflows.Observability;
using Microsoft.Shared.Diagnostics;
namespace Microsoft.Agents.AI.Workflows;
@@ -20,6 +22,9 @@ public sealed class StreamingRun
private TaskCompletionSource<object>? _waitForResponseSource;
private readonly ISuperStepRunner _stepRunner;
private static readonly string s_namespace = typeof(StreamingRun).Namespace!;
private static readonly ActivitySource s_activitySource = new(s_namespace);
/// <summary>
/// Gets a value indicating whether there are any outstanding <see cref="ExternalRequest"/>s for which a
/// <see cref="ExternalResponse"/> has not been sent.
@@ -95,12 +100,33 @@ public sealed class StreamingRun
this._stepRunner.WorkflowEvent += OnWorkflowEvent;
using Activity? activity = s_activitySource.StartActivity(ActivityNames.WorkflowRun);
activity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId).SetTag(Tags.RunId, this.RunId);
try
{
activity?.AddEvent(new ActivityEvent(EventNames.WorkflowStarted));
do
{
// Because we may be yielding out of this function, we need to ensure that the Activity.Current
// is set to our activity for the duration of this loop iteration.
Activity.Current = activity;
// Drain SuperSteps while there are steps to run
await this._stepRunner.RunSuperStepAsync(cancellationToken).ConfigureAwait(false);
try
{
await this._stepRunner.RunSuperStepAsync(cancellationToken).ConfigureAwait(false);
}
catch (Exception ex) when (activity is not null)
{
activity.AddEvent(new ActivityEvent(EventNames.WorkflowError, tags: new() {
{ Tags.ErrorType, ex.GetType().FullName },
{ Tags.BuildErrorMessage, ex.Message },
}));
activity.CaptureException(ex);
throw;
}
if (cancellationToken.IsCancellationRequested)
{
yield break; // Exit if cancellation is requested
@@ -146,6 +172,8 @@ public sealed class StreamingRun
this._waitForResponseSource = null;
}
} while (this._stepRunner.HasUnprocessedMessages);
activity?.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted));
}
finally
{
@@ -2,9 +2,12 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows.Observability;
using Microsoft.Shared.Diagnostics;
namespace Microsoft.Agents.AI.Workflows;
@@ -34,6 +37,9 @@ public class WorkflowBuilder
private readonly string _startExecutorId;
private static readonly string s_namespace = typeof(WorkflowBuilder).Namespace!;
private static readonly ActivitySource s_activitySource = new(s_namespace);
/// <summary>
/// Initializes a new instance of the WorkflowBuilder class with the specified starting executor.
/// </summary>
@@ -331,6 +337,57 @@ public class WorkflowBuilder
// TODO: This is likely a pipe-dream, but can we do any type-checking on the edges? (Not without instantiating the executors...)
}
private Workflow BuildInternal(Activity? activity = null)
{
activity?.AddEvent(new ActivityEvent(EventNames.BuildStarted));
try
{
this.Validate();
}
catch (Exception ex) when (activity is not null)
{
activity.AddEvent(new ActivityEvent(EventNames.BuildError, tags: new() {
{ Tags.BuildErrorMessage, ex.Message },
{ Tags.BuildErrorType, ex.GetType().FullName }
}));
activity.CaptureException(ex);
throw;
}
activity?.AddEvent(new ActivityEvent(EventNames.BuildValidationCompleted));
var workflow = new Workflow(this._startExecutorId)
{
Registrations = this._executors,
Edges = this._edges,
Ports = this._inputPorts,
OutputExecutors = this._outputExecutors
};
// Using the start executor ID as a proxy for the workflow ID
activity?.SetTag(Tags.WorkflowId, workflow.StartExecutorId);
if (activity is not null)
{
var workflowJsonDefinitionData = new WorkflowJsonDefinitionData
{
StartExecutorId = this._startExecutorId,
Edges = this._edges.Values.SelectMany(e => e),
Ports = this._inputPorts.Values,
OutputExecutors = this._outputExecutors
};
activity.SetTag(
Tags.WorkflowDefinition,
JsonSerializer.Serialize(
workflowJsonDefinitionData,
WorkflowJsonDefinitionJsonContext.Default.WorkflowJsonDefinitionData
)
);
}
return workflow;
}
/// <summary>
/// Builds and returns a workflow instance.
/// </summary>
@@ -338,15 +395,13 @@ public class WorkflowBuilder
/// or if the start executor is not bound.</exception>
public Workflow Build()
{
this.Validate();
using Activity? activity = s_activitySource.StartActivity(ActivityNames.WorkflowBuild);
return new Workflow(this._startExecutorId)
{
Registrations = this._executors,
Edges = this._edges,
Ports = this._inputPorts,
OutputExecutors = this._outputExecutors
};
var workflow = this.BuildInternal(activity);
activity?.AddEvent(new ActivityEvent(EventNames.BuildCompleted));
return workflow;
}
/// <summary>
@@ -356,16 +411,26 @@ public class WorkflowBuilder
/// <exception cref="InvalidOperationException">Thrown if the built workflow cannot process messages of the specified input type,</exception>
public async ValueTask<Workflow<TInput>> BuildAsync<TInput>() where TInput : notnull
{
Workflow<TInput>? maybeWorkflow = await this.Build()
using Activity? activity = s_activitySource.StartActivity(ActivityNames.WorkflowBuild);
Workflow<TInput>? maybeWorkflow = await this.BuildInternal(activity)
.TryPromoteAsync<TInput>()
.ConfigureAwait(false);
if (maybeWorkflow is null)
{
throw new InvalidOperationException(
var exception = new InvalidOperationException(
$"The built workflow cannot process input of type '{typeof(TInput).FullName}'.");
activity?.AddEvent(new ActivityEvent(EventNames.BuildError, tags: new() {
{ Tags.BuildErrorMessage, exception.Message },
{ Tags.BuildErrorType, exception.GetType().FullName }
}));
activity?.CaptureException(exception);
throw exception;
}
activity?.AddEvent(new ActivityEvent(EventNames.BuildCompleted));
return maybeWorkflow;
}
}
@@ -0,0 +1,20 @@
// Copyright (c) Microsoft. All rights reserved.
using System.Collections.Generic;
using System.Text.Json.Serialization;
namespace Microsoft.Agents.AI.Workflows;
[JsonSourceGenerationOptions(UseStringEnumConverter = true)]
[JsonSerializable(typeof(WorkflowJsonDefinitionData))]
internal partial class WorkflowJsonDefinitionJsonContext : JsonSerializerContext
{
}
internal class WorkflowJsonDefinitionData
{
public string StartExecutorId { get; set; } = string.Empty;
public IEnumerable<Edge> Edges { get; set; } = [];
public IEnumerable<InputPort> Ports { get; set; } = [];
public IEnumerable<string> OutputExecutors { get; set; } = [];
}
@@ -149,6 +149,8 @@ public class SpecializedExecutorSmokeTests
return default;
}
public IReadOnlyDictionary<string, string>? TraceContext => null;
}
[Fact]
@@ -9,7 +9,10 @@ namespace Microsoft.Agents.AI.Workflows.UnitTests;
public class TestRunContext : IRunnerContext
{
private sealed class BoundContext(string executorId, TestRunContext runnerContext) : IWorkflowContext
private sealed class BoundContext(
string executorId,
TestRunContext runnerContext,
IReadOnlyDictionary<string, string>? traceContext) : IWorkflowContext
{
public ValueTask AddEventAsync(WorkflowEvent workflowEvent)
=> runnerContext.AddEventAsync(workflowEvent);
@@ -34,6 +37,8 @@ public class TestRunContext : IRunnerContext
public ValueTask SendMessageAsync(object message, string? targetId = null)
=> runnerContext.SendMessageAsync(executorId, message, targetId);
public IReadOnlyDictionary<string, string>? TraceContext => traceContext;
}
public List<WorkflowEvent> Events { get; } = [];
@@ -44,7 +49,8 @@ public class TestRunContext : IRunnerContext
return default;
}
public IWorkflowContext Bind(string executorId) => new BoundContext(executorId, this);
public IWorkflowContext Bind(string executorId, Dictionary<string, string>? traceContext = null)
=> new BoundContext(executorId, this, traceContext);
public List<ExternalRequest> ExternalRequests { get; } = [];
public ValueTask PostAsync(ExternalRequest request)