Files
Shyju Krishnankutty cbcdb2d29e .NET: Add durable workflow support (#4436)
* .NET: [Feature Branch] Add basic durable workflow support (#3648)

* Add basic durable workflow support.

* PR feedback fixes

* Add conditional edge sample.

* PR feedback fixes.

* Minor cleanup.

* Minor cleanup

* Minor formatting improvements.

* Improve comments/documentation on the execution flow.

* .NET: [Feature Branch] Add Azure Functions hosting support for durable workflows (#3935)

* Adding azure functions workflow support.

* - PR feedback fixes.
- Add example to demonstrate complex Object as payload.

* rename instanceId to runId.

* Use custom ITaskOrchestrator to run orchestrator function.

* .NET: [Feature Branch] Adding support for events & shared state in durable workflows (#4020)

* Adding support for events & shared state in durable workflows.

* PR feedback fixes

* PR feedback fixes.

* Add YieldOutputAsync calls to 05_WorkflowEvents sample executors

The integration test asserts that WorkflowOutputEvent is found in the
stream, but the sample executors only used AddEventAsync for custom
events and never called YieldOutputAsync. Since WorkflowOutputEvent is
only emitted via explicit YieldOutputAsync calls, the assertion would
fail. Added YieldOutputAsync to each executor to match the test
expectation and demonstrate the API in the sample.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix deserialization to use shared serializer options.

* PR feedback updates.

* Sample cleanup

* PR feedback fixes

* Addressing PR review feedback for DurableStreamingWorkflowRun

   - Use -1 instead of 0 for taskId in TaskFailedException when task ID is not relevant.
   - Add [NotNullWhen(true)] to TryParseWorkflowResult out parameter following .NET TryXXX conventions.

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* .NET: [Feature Branch]  Add nested sub-workflow support for durable workflows (#4190)

* .NET: [Feature Branch] Add nested sub-workflow support for durable workflows

* fix readme path

* Switch Orchestration output from string to DurableWorkflowResult.

* PR feedback fixes

* Minor cleanup based on PR feedback.

* .NET: [Feature Branch] Add Human In the Loop support for durable workflows (#4358)

* Add Azure Functions HITL workflow sample

Add 06_WorkflowHITL Azure Functions sample demonstrating Human-in-the-Loop
workflow support with HTTP endpoints for status checking and approval responses.

The sample includes:
- ExpenseReimbursement workflow with RequestPort for manager approval
- Custom HTTP endpoint to check workflow status and pending approvals
- Custom HTTP endpoint to send approval responses via RaiseEventAsync
- demo.http file with step-by-step interaction examples

* PR feedback fixes

* Minor comment cleanup

* Minor comment clReverted the `!context.IsReplaying` guards on `PendingEvents.Add`/`RemoveAll` and `SetCustomStatus` in `ExecuteRequestPortAsync`. The guards broke fan-out scenarios where parallel RequestPorts      need to be discoverable after replay. `SetCustomStatus` is idempotent metadata that doesn't affect replay determinism.eanup

* fix  for PR feedback

* PR feedback updates

* Improvements to samples

* Improvements to README

* Update samples to use parallel request ports.

* Unit tests

* Introduce local variables to improve readability of Workflows.Workflows access patter

* Use GitHub-style callouts and add PowerShell command variants in HITL sample README

* Add changelog entries for durable workflow support (#4436)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Bump Microsoft.DurableTask.Worker to 1.19.1 to fix version downgrade

Microsoft.Azure.Functions.Worker.Extensions.DurableTask 1.13.1 requires
Microsoft.DurableTask.Worker >= 1.19.1 via its transitive dependency on
Microsoft.DurableTask.Worker.Grpc 1.19.1.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix broken markdown links in durable workflow sample READMEs

- Create Workflow/README.md with environment setup docs
- Fix ../README.md -> ../../README.md in ConsoleApps 01, 02, 03, 08
- Fix SubWorkflows relative path (3 levels -> 4 levels up)
- Fix dead Durable Task Scheduler URL

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix build errors from main merge: Throw conflict, ExecuteAsync rename, GetNewSessionAsync rename

- Remove InjectSharedThrow from DurableTask csproj (uses Workflows' internal Throw via InternalsVisibleTo)
- Update ExecuteAsync -> ExecuteCoreAsync with WorkflowTelemetryContext.Disabled
- Update GetNewSessionAsync -> CreateSessionAsync

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Move durable workflow samples to 04-hosting/DurableWorkflows

Aligns with main branch sample reorganization where durable samples
live under 04-hosting/ (alongside DurableAgents/).

- Move samples/Durable/Workflow/ -> samples/04-hosting/DurableWorkflows/
- Add Directory.Build.props matching DurableAgents pattern
- Update slnx project paths
- Update integration test sample paths
- Update README cd paths and cross-references

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix build errors: remove duplicate base class members, update renamed APIs

- Remove duplicate OutputLog, WriteInputAsync, CreateTestTimeoutCts, etc. from
  ConsoleAppSamplesValidation (already in SamplesValidationBase)
- Update AddFanInEdge -> AddFanInBarrierEdge in workflow samples
- Update GetNewSessionAsync -> CreateSessionAsync in workflow samples
- Update SourceId -> ExecutorId (obsolete) in workflow samples

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix dotnet format issues: add UTF-8 BOM and remove unused using

- Add UTF-8 BOM to 20 .cs files across DurableTask, AzureFunctions,
  unit tests, and workflow samples
- Remove unnecessary using directive in 07_SubWorkflows/Executors.cs

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix typo PaymentProcesser -> PaymentProcessor and garbled arrows in README

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix GetExecutorName to handle agent names with underscores

Split on last underscore instead of first, and validate that the
suffix is a 32-char hex string (sanitized GUID) before stripping it.
This prevents truncation of agent names like 'my_agent' when the
executor ID is 'my_agent_<guid>'.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Align DurableTask.Client.AzureManaged to 1.19.1

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Bump DurableTask and Azure Functions extension package versions

- DurableTask.* packages: 1.19.1 -> 1.22.0
- Functions.Worker.Extensions.DurableTask: 1.13.1 -> 1.16.0
- Functions.Worker.Extensions.DurableTask.AzureManaged: 1.0.1 -> 1.5.0 (telemetry bug fix)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Bump DurableTask SDK packages to 1.22.0

- DurableTask.Client: 1.19.1 -> 1.22.0
- DurableTask.Client.AzureManaged: 1.19.1 -> 1.22.0
- DurableTask.Worker: 1.19.1 -> 1.22.0
- DurableTask.Worker.AzureManaged: 1.19.1 -> 1.22.0
- Azure Functions extensions kept at original versions (1.13.1/1.0.1) due to
  host-side DurableTask.Core 3.7.0 incompatibility with newer extensions

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Update Microsoft.Azure.Functions.Worker.Extensions.DurableTask to "1.16.0"

* Add the local.settings.json files to the sample which were previously ignored. This aligns with our other samples.

* Increase timeout for tests as CI has them failing transiently.

* increaset timeout value for azure functions integration tests.

* Add YieldsOutput(string) to workflow shared state sample executors

ValidateOrder and EnrichOrder call YieldOutputAsync with string messages,
but only their TOutput (OrderDetails) was in the allowed yield types.
This caused TargetInvocationException in the WorkflowSharedState sample
validation integration test.

* Downgrade the durable packages to 1.18.0

* Downgrading Worker.Extensions.DurableTask to 1.12.1

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-16 23:00:50 +00:00

766 lines
30 KiB
C#

// Copyright (c) Microsoft. All rights reserved.
using System.Text.Json;
using Microsoft.Agents.AI.DurableTask.Workflows;
using Microsoft.Agents.AI.Workflows;
using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using Moq;
namespace Microsoft.Agents.AI.DurableTask.UnitTests.Workflows;
public sealed class DurableStreamingWorkflowRunTests
{
private const string InstanceId = "test-instance-123";
private const string WorkflowTestName = "TestWorkflow";
private static Workflow CreateTestWorkflow() =>
new WorkflowBuilder(new FunctionExecutor<string>("start", (_, _, _) => default))
.WithName(WorkflowTestName)
.Build();
private static OrchestrationMetadata CreateMetadata(
OrchestrationRuntimeStatus status,
string? serializedCustomStatus = null,
string? serializedOutput = null,
TaskFailureDetails? failureDetails = null)
{
return new OrchestrationMetadata(WorkflowTestName, InstanceId)
{
RuntimeStatus = status,
SerializedCustomStatus = serializedCustomStatus,
SerializedOutput = serializedOutput,
FailureDetails = failureDetails,
};
}
private static string SerializeCustomStatus(List<string> events)
{
DurableWorkflowLiveStatus status = new() { Events = events };
return JsonSerializer.Serialize(status, DurableSerialization.Options);
}
private static string SerializeCustomStatusWithPendingEvents(
List<string> events,
List<PendingRequestPortStatus> pendingEvents)
{
DurableWorkflowLiveStatus status = new() { Events = events, PendingEvents = pendingEvents };
return JsonSerializer.Serialize(status, DurableSerialization.Options);
}
private static Workflow CreateTestWorkflowWithRequestPort(string requestPortId)
{
FunctionExecutor<string> start = new("start", (_, _, _) => default);
RequestPort<string, string> requestPort = RequestPort.Create<string, string>(requestPortId);
FunctionExecutor<string> end = new("end", (_, _, _) => default);
return new WorkflowBuilder(start)
.WithName(WorkflowTestName)
.AddEdge(start, requestPort)
.AddEdge(requestPort, end)
.Build();
}
private static string SerializeWorkflowResult(string? result, List<string> events)
{
DurableWorkflowResult workflowResult = new() { Result = result, Events = events };
return JsonSerializer.Serialize(workflowResult, DurableWorkflowJsonContext.Default.DurableWorkflowResult);
}
private static string SerializeEvent(WorkflowEvent evt)
{
Type eventType = evt.GetType();
TypedPayload wrapper = new()
{
TypeName = eventType.AssemblyQualifiedName,
Data = JsonSerializer.Serialize(evt, eventType, DurableSerialization.Options)
};
return JsonSerializer.Serialize(wrapper, DurableWorkflowJsonContext.Default.TypedPayload);
}
#region Constructor and Properties
[Fact]
public void Constructor_SetsRunIdAndWorkflowName()
{
// Arrange
Mock<DurableTaskClient> mockClient = new("test");
// Act
DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow());
// Assert
Assert.Equal(InstanceId, run.RunId);
Assert.Equal(WorkflowTestName, run.WorkflowName);
}
[Fact]
public void Constructor_NoWorkflowName_SetsEmptyString()
{
// Arrange
Mock<DurableTaskClient> mockClient = new("test");
Workflow workflow = new WorkflowBuilder(new FunctionExecutor<string>("start", (_, _, _) => default)).Build();
// Act
DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, workflow);
// Assert
Assert.Equal(string.Empty, run.WorkflowName);
}
#endregion
#region GetStatusAsync
[Theory]
[InlineData(OrchestrationRuntimeStatus.Pending, DurableRunStatus.Pending)]
[InlineData(OrchestrationRuntimeStatus.Running, DurableRunStatus.Running)]
[InlineData(OrchestrationRuntimeStatus.Completed, DurableRunStatus.Completed)]
[InlineData(OrchestrationRuntimeStatus.Failed, DurableRunStatus.Failed)]
[InlineData(OrchestrationRuntimeStatus.Terminated, DurableRunStatus.Terminated)]
[InlineData(OrchestrationRuntimeStatus.Suspended, DurableRunStatus.Suspended)]
public async Task GetStatusAsync_MapsRuntimeStatusCorrectlyAsync(
OrchestrationRuntimeStatus runtimeStatus,
DurableRunStatus expectedStatus)
{
// Arrange
Mock<DurableTaskClient> mockClient = new("test");
mockClient.Setup(c => c.GetInstanceAsync(InstanceId, false, It.IsAny<CancellationToken>()))
.ReturnsAsync(CreateMetadata(runtimeStatus));
DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow());
// Act
DurableRunStatus status = await run.GetStatusAsync();
// Assert
Assert.Equal(expectedStatus, status);
}
[Fact]
public async Task GetStatusAsync_InstanceNotFound_ReturnsNotFoundAsync()
{
// Arrange
Mock<DurableTaskClient> mockClient = new("test");
mockClient.Setup(c => c.GetInstanceAsync(InstanceId, false, It.IsAny<CancellationToken>()))
.ReturnsAsync((OrchestrationMetadata?)null);
DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow());
// Act
DurableRunStatus status = await run.GetStatusAsync();
// Assert
Assert.Equal(DurableRunStatus.NotFound, status);
}
#endregion
#region WatchStreamAsync
[Fact]
public async Task WatchStreamAsync_InstanceNotFound_YieldsNoEventsAsync()
{
// Arrange
Mock<DurableTaskClient> mockClient = new("test");
mockClient.Setup(c => c.GetInstanceAsync(InstanceId, true, It.IsAny<CancellationToken>()))
.ReturnsAsync((OrchestrationMetadata?)null);
DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow());
// Act
List<WorkflowEvent> events = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
events.Add(evt);
}
// Assert
Assert.Empty(events);
}
[Fact]
public async Task WatchStreamAsync_CompletedWithResult_YieldsCompletedEventAsync()
{
// Arrange
string serializedOutput = SerializeWorkflowResult("done", []);
Mock<DurableTaskClient> mockClient = new("test");
mockClient.Setup(c => c.GetInstanceAsync(InstanceId, true, It.IsAny<CancellationToken>()))
.ReturnsAsync(CreateMetadata(OrchestrationRuntimeStatus.Completed, serializedOutput: serializedOutput));
DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow());
// Act
List<WorkflowEvent> events = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
events.Add(evt);
}
// Assert
Assert.Single(events);
DurableWorkflowCompletedEvent completedEvent = Assert.IsType<DurableWorkflowCompletedEvent>(events[0]);
Assert.Equal("done", completedEvent.Data);
}
[Fact]
public async Task WatchStreamAsync_CompletedWithEventsInOutput_YieldsEventsAndCompletionAsync()
{
// Arrange
DurableHaltRequestedEvent haltEvent = new("executor-1");
string serializedEvent = SerializeEvent(haltEvent);
string serializedOutput = SerializeWorkflowResult("result", [serializedEvent]);
Mock<DurableTaskClient> mockClient = new("test");
mockClient.Setup(c => c.GetInstanceAsync(InstanceId, true, It.IsAny<CancellationToken>()))
.ReturnsAsync(CreateMetadata(OrchestrationRuntimeStatus.Completed, serializedOutput: serializedOutput));
DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow());
// Act
List<WorkflowEvent> events = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
events.Add(evt);
}
// Assert
Assert.Equal(2, events.Count);
DurableHaltRequestedEvent haltResult = Assert.IsType<DurableHaltRequestedEvent>(events[0]);
Assert.Equal("executor-1", haltResult.ExecutorId);
DurableWorkflowCompletedEvent completedResult = Assert.IsType<DurableWorkflowCompletedEvent>(events[1]);
Assert.Equal("result", completedResult.Result);
}
[Fact]
public async Task WatchStreamAsync_CompletedWithoutWrapper_YieldsFailedEventAsync()
{
// Arrange — output not wrapped in DurableWorkflowResult (indicates a bug)
Mock<DurableTaskClient> mockClient = new("test");
mockClient.Setup(c => c.GetInstanceAsync(InstanceId, true, It.IsAny<CancellationToken>()))
.ReturnsAsync(CreateMetadata(OrchestrationRuntimeStatus.Completed, serializedOutput: "\"raw output\""));
DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow());
// Act
List<WorkflowEvent> events = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
events.Add(evt);
}
// Assert — yields a failed event with diagnostic message instead of crashing
Assert.Single(events);
DurableWorkflowFailedEvent failedEvent = Assert.IsType<DurableWorkflowFailedEvent>(events[0]);
Assert.Contains("could not be parsed", failedEvent.ErrorMessage);
}
[Fact]
public async Task WatchStreamAsync_Failed_YieldsFailedEventAsync()
{
// Arrange
Mock<DurableTaskClient> mockClient = new("test");
TaskFailureDetails failureDetails = new("ErrorType", "Something went wrong", null, null, null);
mockClient.Setup(c => c.GetInstanceAsync(InstanceId, true, It.IsAny<CancellationToken>()))
.ReturnsAsync(CreateMetadata(
OrchestrationRuntimeStatus.Failed,
failureDetails: failureDetails));
DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow());
// Act
List<WorkflowEvent> events = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
events.Add(evt);
}
// Assert
Assert.Single(events);
DurableWorkflowFailedEvent failedEvent = Assert.IsType<DurableWorkflowFailedEvent>(events[0]);
Assert.Equal("Something went wrong", failedEvent.ErrorMessage);
Assert.NotNull(failedEvent.FailureDetails);
Assert.Equal("ErrorType", failedEvent.FailureDetails.ErrorType);
Assert.Equal("Something went wrong", failedEvent.FailureDetails.ErrorMessage);
}
[Fact]
public async Task WatchStreamAsync_FailedWithNoDetails_YieldsDefaultMessageAsync()
{
// Arrange
Mock<DurableTaskClient> mockClient = new("test");
mockClient.Setup(c => c.GetInstanceAsync(InstanceId, true, It.IsAny<CancellationToken>()))
.ReturnsAsync(CreateMetadata(OrchestrationRuntimeStatus.Failed));
DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow());
// Act
List<WorkflowEvent> events = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
events.Add(evt);
}
// Assert
Assert.Single(events);
DurableWorkflowFailedEvent failedEvent = Assert.IsType<DurableWorkflowFailedEvent>(events[0]);
Assert.Equal("Workflow execution failed.", failedEvent.ErrorMessage);
Assert.Null(failedEvent.FailureDetails);
}
[Fact]
public async Task WatchStreamAsync_Terminated_YieldsFailedEventAsync()
{
// Arrange
Mock<DurableTaskClient> mockClient = new("test");
mockClient.Setup(c => c.GetInstanceAsync(InstanceId, true, It.IsAny<CancellationToken>()))
.ReturnsAsync(CreateMetadata(OrchestrationRuntimeStatus.Terminated));
DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow());
// Act
List<WorkflowEvent> events = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
events.Add(evt);
}
// Assert
Assert.Single(events);
DurableWorkflowFailedEvent failedEvent = Assert.IsType<DurableWorkflowFailedEvent>(events[0]);
Assert.Equal("Workflow was terminated.", failedEvent.ErrorMessage);
Assert.Null(failedEvent.FailureDetails);
}
[Fact]
public async Task WatchStreamAsync_EventsInCustomStatus_YieldsEventsBeforeCompletionAsync()
{
// Arrange
DurableHaltRequestedEvent haltEvent = new("exec-1");
string serializedEvent = SerializeEvent(haltEvent);
string customStatus = SerializeCustomStatus([serializedEvent]);
string serializedOutput = SerializeWorkflowResult("final", []);
int callCount = 0;
Mock<DurableTaskClient> mockClient = new("test");
mockClient.Setup(c => c.GetInstanceAsync(InstanceId, true, It.IsAny<CancellationToken>()))
.ReturnsAsync(() =>
{
callCount++;
if (callCount == 1)
{
return CreateMetadata(OrchestrationRuntimeStatus.Running, serializedCustomStatus: customStatus);
}
return CreateMetadata(OrchestrationRuntimeStatus.Completed, serializedOutput: serializedOutput);
});
DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow());
// Act
List<WorkflowEvent> events = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
events.Add(evt);
}
// Assert
Assert.Equal(2, events.Count);
DurableHaltRequestedEvent haltResult = Assert.IsType<DurableHaltRequestedEvent>(events[0]);
Assert.Equal("exec-1", haltResult.ExecutorId);
DurableWorkflowCompletedEvent completedResult = Assert.IsType<DurableWorkflowCompletedEvent>(events[1]);
Assert.Equal("final", completedResult.Result);
}
[Fact]
public async Task WatchStreamAsync_IncrementalEvents_YieldsOnlyNewEventsPerPollAsync()
{
// Arrange — simulate 3 poll cycles where events accumulate in custom status,
// then a final completion poll. This validates:
// 1. Events arriving across multiple poll cycles are yielded incrementally
// 2. Already-seen events are not re-yielded (lastReadEventIndex dedup)
// 3. Completion event follows all streamed events
DurableHaltRequestedEvent event1 = new("executor-1");
DurableHaltRequestedEvent event2 = new("executor-2");
DurableHaltRequestedEvent event3 = new("executor-3");
string serializedEvent1 = SerializeEvent(event1);
string serializedEvent2 = SerializeEvent(event2);
string serializedEvent3 = SerializeEvent(event3);
// Poll 1: 1 event in custom status
string customStatus1 = SerializeCustomStatus([serializedEvent1]);
// Poll 2: same event + 1 new event (accumulating list)
string customStatus2 = SerializeCustomStatus([serializedEvent1, serializedEvent2]);
// Poll 3: all 3 events accumulated
string customStatus3 = SerializeCustomStatus([serializedEvent1, serializedEvent2, serializedEvent3]);
// Poll 4: completed, all events also in output
string serializedOutput = SerializeWorkflowResult("done", [serializedEvent1, serializedEvent2, serializedEvent3]);
int callCount = 0;
Mock<DurableTaskClient> mockClient = new("test");
mockClient.Setup(c => c.GetInstanceAsync(InstanceId, true, It.IsAny<CancellationToken>()))
.ReturnsAsync(() =>
{
callCount++;
return callCount switch
{
1 => CreateMetadata(OrchestrationRuntimeStatus.Running, serializedCustomStatus: customStatus1),
2 => CreateMetadata(OrchestrationRuntimeStatus.Running, serializedCustomStatus: customStatus2),
3 => CreateMetadata(OrchestrationRuntimeStatus.Running, serializedCustomStatus: customStatus3),
_ => CreateMetadata(OrchestrationRuntimeStatus.Completed, serializedOutput: serializedOutput),
};
});
DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow());
// Act
List<WorkflowEvent> events = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
events.Add(evt);
}
// Assert — exactly 4 events: 3 incremental halt events + 1 completion
Assert.Equal(4, events.Count);
DurableHaltRequestedEvent halt1 = Assert.IsType<DurableHaltRequestedEvent>(events[0]);
DurableHaltRequestedEvent halt2 = Assert.IsType<DurableHaltRequestedEvent>(events[1]);
DurableHaltRequestedEvent halt3 = Assert.IsType<DurableHaltRequestedEvent>(events[2]);
Assert.Equal("executor-1", halt1.ExecutorId);
Assert.Equal("executor-2", halt2.ExecutorId);
Assert.Equal("executor-3", halt3.ExecutorId);
DurableWorkflowCompletedEvent completed = Assert.IsType<DurableWorkflowCompletedEvent>(events[3]);
Assert.Equal("done", completed.Data);
}
[Fact]
public async Task WatchStreamAsync_NoNewEventsOnRepoll_DoesNotDuplicateAsync()
{
// Arrange — simulate polling where custom status doesn't change between polls,
// validating that events are not duplicated when the list is unchanged.
DurableHaltRequestedEvent event1 = new("executor-1");
string serializedEvent1 = SerializeEvent(event1);
string customStatus = SerializeCustomStatus([serializedEvent1]);
string serializedOutput = SerializeWorkflowResult("result", [serializedEvent1]);
int callCount = 0;
Mock<DurableTaskClient> mockClient = new("test");
mockClient.Setup(c => c.GetInstanceAsync(InstanceId, true, It.IsAny<CancellationToken>()))
.ReturnsAsync(() =>
{
callCount++;
return callCount switch
{
// First 3 polls return the same custom status (no new events after first)
<= 3 => CreateMetadata(OrchestrationRuntimeStatus.Running, serializedCustomStatus: customStatus),
_ => CreateMetadata(OrchestrationRuntimeStatus.Completed, serializedOutput: serializedOutput),
};
});
DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow());
// Act
List<WorkflowEvent> events = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
events.Add(evt);
}
// Assert — event1 appears exactly once despite 3 polls with the same status
Assert.Equal(2, events.Count);
DurableHaltRequestedEvent haltResult = Assert.IsType<DurableHaltRequestedEvent>(events[0]);
Assert.Equal("executor-1", haltResult.ExecutorId);
DurableWorkflowCompletedEvent completedResult = Assert.IsType<DurableWorkflowCompletedEvent>(events[1]);
Assert.Equal("result", completedResult.Result);
}
[Fact]
public async Task WatchStreamAsync_Cancellation_EndsGracefullyAsync()
{
// Arrange
using CancellationTokenSource cts = new();
int pollCount = 0;
Mock<DurableTaskClient> mockClient = new("test");
mockClient.Setup(c => c.GetInstanceAsync(InstanceId, true, It.IsAny<CancellationToken>()))
.ReturnsAsync(() =>
{
if (++pollCount >= 2)
{
cts.Cancel();
}
return CreateMetadata(OrchestrationRuntimeStatus.Running);
});
DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow());
// Act
List<WorkflowEvent> events = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync(cts.Token))
{
events.Add(evt);
}
// Assert — no exception thrown, stream ends cleanly
Assert.Empty(events);
}
[Fact]
public async Task WatchStreamAsync_PendingRequestPort_YieldsWaitingForInputEventAsync()
{
// Arrange
string customStatus = SerializeCustomStatusWithPendingEvents(
[],
[new PendingRequestPortStatus("ApprovalPort", """{"amount":100}""")]);
string serializedOutput = SerializeWorkflowResult("approved", []);
int callCount = 0;
Mock<DurableTaskClient> mockClient = new("test");
mockClient.Setup(c => c.GetInstanceAsync(InstanceId, true, It.IsAny<CancellationToken>()))
.ReturnsAsync(() =>
{
callCount++;
return callCount == 1
? CreateMetadata(OrchestrationRuntimeStatus.Running, serializedCustomStatus: customStatus)
: CreateMetadata(OrchestrationRuntimeStatus.Completed, serializedOutput: serializedOutput);
});
Workflow workflow = CreateTestWorkflowWithRequestPort("ApprovalPort");
DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, workflow);
// Act
List<WorkflowEvent> events = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
events.Add(evt);
}
// Assert
Assert.Equal(2, events.Count);
DurableWorkflowWaitingForInputEvent waitingEvent = Assert.IsType<DurableWorkflowWaitingForInputEvent>(events[0]);
Assert.Equal("ApprovalPort", waitingEvent.RequestPort.Id);
Assert.Contains("amount", waitingEvent.Input);
DurableWorkflowCompletedEvent completedEvent = Assert.IsType<DurableWorkflowCompletedEvent>(events[1]);
Assert.Equal("approved", completedEvent.Result);
}
[Fact]
public async Task WatchStreamAsync_PendingRequestPort_DoesNotDuplicateOnSubsequentPollsAsync()
{
// Arrange — same pending event across 2 polls, then completion
string customStatus = SerializeCustomStatusWithPendingEvents(
[],
[new PendingRequestPortStatus("ApprovalPort", """{"amount":100}""")]);
string serializedOutput = SerializeWorkflowResult("done", []);
int callCount = 0;
Mock<DurableTaskClient> mockClient = new("test");
mockClient.Setup(c => c.GetInstanceAsync(InstanceId, true, It.IsAny<CancellationToken>()))
.ReturnsAsync(() =>
{
callCount++;
return callCount switch
{
<= 2 => CreateMetadata(OrchestrationRuntimeStatus.Running, serializedCustomStatus: customStatus),
_ => CreateMetadata(OrchestrationRuntimeStatus.Completed, serializedOutput: serializedOutput),
};
});
Workflow workflow = CreateTestWorkflowWithRequestPort("ApprovalPort");
DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, workflow);
// Act
List<WorkflowEvent> events = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
events.Add(evt);
}
// Assert — WaitingForInputEvent yielded only once despite 2 polls
Assert.Equal(2, events.Count);
Assert.IsType<DurableWorkflowWaitingForInputEvent>(events[0]);
Assert.IsType<DurableWorkflowCompletedEvent>(events[1]);
}
#endregion
#region SendResponseAsync
[Fact]
public async Task SendResponseAsync_SerializesAndRaisesEventAsync()
{
// Arrange
Mock<DurableTaskClient> mockClient = new("test");
mockClient.Setup(c => c.RaiseEventAsync(
InstanceId,
"ApprovalPort",
It.IsAny<string>(),
It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask);
RequestPort approvalPort = RequestPort.Create<string, string>("ApprovalPort");
DurableWorkflowWaitingForInputEvent requestEvent = new("""{"amount":100}""", approvalPort);
Workflow workflow = CreateTestWorkflowWithRequestPort("ApprovalPort");
DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, workflow);
// Act
await run.SendResponseAsync(requestEvent, new { approved = true, comments = "Looks good" });
// Assert
mockClient.Verify(c => c.RaiseEventAsync(
InstanceId,
"ApprovalPort",
It.Is<string>(s => s.Contains("approved") && s.Contains("true")),
It.IsAny<CancellationToken>()), Times.Once);
}
[Fact]
public async Task SendResponseAsync_NullRequestEvent_ThrowsAsync()
{
// Arrange
Mock<DurableTaskClient> mockClient = new("test");
DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow());
// Act & Assert
await Assert.ThrowsAsync<ArgumentNullException>(() =>
run.SendResponseAsync(null!, "response").AsTask());
}
#endregion
#region WaitForCompletionAsync
[Fact]
public async Task WaitForCompletionAsync_Completed_ReturnsResultAsync()
{
// Arrange
string serializedOutput = SerializeWorkflowResult("hello world", []);
Mock<DurableTaskClient> mockClient = new("test");
mockClient.Setup(c => c.WaitForInstanceCompletionAsync(InstanceId, true, It.IsAny<CancellationToken>()))
.ReturnsAsync(CreateMetadata(OrchestrationRuntimeStatus.Completed, serializedOutput: serializedOutput));
DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow());
// Act
string? result = await run.WaitForCompletionAsync<string>();
// Assert
Assert.Equal("hello world", result);
}
[Fact]
public async Task WaitForCompletionAsync_Failed_ThrowsTaskFailedExceptionAsync()
{
// Arrange
Mock<DurableTaskClient> mockClient = new("test");
mockClient.Setup(c => c.WaitForInstanceCompletionAsync(InstanceId, true, It.IsAny<CancellationToken>()))
.ReturnsAsync(CreateMetadata(
OrchestrationRuntimeStatus.Failed,
failureDetails: new TaskFailureDetails("Error", "kaboom", null, null, null)));
DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow());
// Act & Assert
TaskFailedException ex = await Assert.ThrowsAsync<TaskFailedException>(
() => run.WaitForCompletionAsync<string>().AsTask());
Assert.Equal("kaboom", ex.FailureDetails.ErrorMessage);
}
[Fact]
public async Task WaitForCompletionAsync_UnexpectedStatus_ThrowsAsync()
{
// Arrange
Mock<DurableTaskClient> mockClient = new("test");
mockClient.Setup(c => c.WaitForInstanceCompletionAsync(InstanceId, true, It.IsAny<CancellationToken>()))
.ReturnsAsync(CreateMetadata(OrchestrationRuntimeStatus.Terminated));
DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow());
// Act & Assert
await Assert.ThrowsAsync<InvalidOperationException>(
() => run.WaitForCompletionAsync<string>().AsTask());
}
#endregion
#region ExtractResult
[Fact]
public void ExtractResult_NullOutput_ReturnsDefault()
{
// Act
string? result = DurableStreamingWorkflowRun.ExtractResult<string>(null);
// Assert
Assert.Null(result);
}
[Fact]
public void ExtractResult_WrappedStringResult_ReturnsUnwrappedString()
{
// Arrange
string serializedOutput = SerializeWorkflowResult("hello", []);
// Act
string? result = DurableStreamingWorkflowRun.ExtractResult<string>(serializedOutput);
// Assert
Assert.Equal("hello", result);
}
[Fact]
public void ExtractResult_UnwrappedOutput_ThrowsInvalidOperationException()
{
// Arrange — raw output not wrapped in DurableWorkflowResult
string serializedOutput = JsonSerializer.Serialize("raw value");
// Act & Assert
Assert.Throws<InvalidOperationException>(
() => DurableStreamingWorkflowRun.ExtractResult<string>(serializedOutput));
}
[Fact]
public void ExtractResult_WrappedObjectResult_DeserializesCorrectly()
{
// Arrange
TestPayload original = new() { Name = "test", Value = 42 };
string resultJson = JsonSerializer.Serialize(original);
string serializedOutput = SerializeWorkflowResult(resultJson, []);
// Act
TestPayload? result = DurableStreamingWorkflowRun.ExtractResult<TestPayload>(serializedOutput);
// Assert
Assert.NotNull(result);
Assert.Equal("test", result.Name);
Assert.Equal(42, result.Value);
}
[Fact]
public void ExtractResult_CamelCaseSerializedObject_DeserializesToPascalCaseMembers()
{
// Arrange — executor outputs are serialized with DurableSerialization.Options (camelCase)
TestPayload original = new() { Name = "camel", Value = 99 };
string resultJson = JsonSerializer.Serialize(original, DurableSerialization.Options);
string serializedOutput = SerializeWorkflowResult(resultJson, []);
// Act
TestPayload? result = DurableStreamingWorkflowRun.ExtractResult<TestPayload>(serializedOutput);
// Assert
Assert.NotNull(result);
Assert.Equal("camel", result.Name);
Assert.Equal(99, result.Value);
}
#endregion
private sealed class TestPayload
{
public string? Name { get; set; }
public int Value { get; set; }
}
}