Fix Foreach body exit wiring in declarative workflows (#6050)

This commit is contained in:
Peter Ibekwe
2026-05-25 23:37:35 -07:00
committed by GitHub
Unverified
parent 08541ee5a9
commit b2e77067e9
3 changed files with 137 additions and 13 deletions
@@ -817,10 +817,14 @@ class DeclarativeWorkflowBuilder:
condition=lambda msg: isinstance(msg, LoopIterationResult) and msg.has_next,
)
# Body exit -> Next (get all exits from body and wire to next_executor)
body_exits = self._get_source_exits(body_entry)
for body_exit in body_exits:
builder.add_edge(source=body_exit, target=next_executor)
# Wire from the LAST body action so the loop only advances after the
# whole body completes. _get_branch_exit walks the chain, skips
# terminators (Break/Continue), and returns nested If/Switch
# structures so _get_source_exits can flatten their branch exits.
body_exit = self._get_branch_exit(body_entry)
if body_exit is not None:
for source_exit in self._get_source_exits(body_exit):
builder.add_edge(source=source_exit, target=next_executor)
# Next -> body (when has_next=True, loop back)
builder.add_edge(
@@ -1008,16 +1012,12 @@ class DeclarativeWorkflowBuilder:
return entry.evaluator if is_structure else entry
def _get_branch_exit(self, branch_entry: Any) -> Any | None:
"""Get the exit executor of a branch.
"""Get the exit point of a branch for downstream wiring.
For a linear sequence of actions, returns the last executor.
For nested structures, returns None (they have their own branch_exits).
Args:
branch_entry: The first executor of the branch
Returns:
The exit executor, or None if branch is empty or ends with a structure
Returns the last executor (or its ``_exit_executor``) for a linear chain,
the nested If/Switch structure itself when the chain ends in one (so
callers can flatten ``branch_exits`` via :meth:`_get_source_exits`), or
``None`` when the branch is empty or ends in a terminator action.
"""
if branch_entry is None:
return None
@@ -2224,6 +2224,101 @@ class TestBuilderControlFlowCreation:
class TestBuilderEdgeWiring:
"""Tests for builder edge wiring methods."""
def test_foreach_advance_edge_wired_from_last_body_action(self):
"""Advance edge must come from the last body action."""
from agent_framework_declarative._workflows import DeclarativeWorkflowBuilder
yaml_def = {
"name": "foreach_seq",
"actions": [
{"kind": "SetValue", "id": "set_items", "path": "Local.items", "value": ["A", "B"]},
{
"kind": "Foreach",
"id": "loop",
"itemsSource": "=Local.items",
"iteratorVariable": "Local.item",
"actions": [
{"kind": "SendActivity", "id": "step_1", "activity": {"text": "one"}},
{"kind": "SendActivity", "id": "step_2", "activity": {"text": "two"}},
{"kind": "SendActivity", "id": "step_3", "activity": {"text": "three"}},
],
},
],
}
workflow = DeclarativeWorkflowBuilder(yaml_def).build()
edges = {(e.source_id, e.target_id) for group in workflow.edge_groups for e in group.edges}
assert ("step_3", "loop_next") in edges
assert ("step_1", "loop_next") not in edges
assert ("step_2", "loop_next") not in edges
assert ("step_1", "step_2") in edges
assert ("step_2", "step_3") in edges
def test_foreach_advance_edge_skipped_for_terminator_body(self):
"""BreakLoop at end of body wires itself to loop_next; no duplicate edge."""
from agent_framework_declarative._workflows import DeclarativeWorkflowBuilder
yaml_def = {
"name": "foreach_terminator",
"actions": [
{"kind": "SetValue", "id": "set_items", "path": "Local.items", "value": ["A"]},
{
"kind": "Foreach",
"id": "loop",
"itemsSource": "=Local.items",
"iteratorVariable": "Local.item",
"actions": [
{"kind": "SendActivity", "id": "step_1", "activity": {"text": "one"}},
{"kind": "BreakLoop", "id": "stop"},
],
},
],
}
workflow = DeclarativeWorkflowBuilder(yaml_def).build()
all_edges = [(e.source_id, e.target_id) for group in workflow.edge_groups for e in group.edges]
assert all_edges.count(("stop", "loop_next")) == 1
assert ("step_1", "loop_next") not in all_edges
def test_foreach_advance_edge_with_if_as_last_body_action(self):
"""Trailing If in a Foreach body wires every branch exit to loop_next."""
from agent_framework_declarative._workflows import DeclarativeWorkflowBuilder
yaml_def = {
"name": "foreach_if_last",
"actions": [
{"kind": "SetValue", "id": "set_items", "path": "Local.items", "value": ["A", "B"]},
{
"kind": "Foreach",
"id": "loop",
"itemsSource": "=Local.items",
"iteratorVariable": "Local.item",
"actions": [
{"kind": "SendActivity", "id": "step_1", "activity": {"text": "one"}},
{
"kind": "If",
"id": "check",
"condition": '=Local.item = "A"',
"then": [
{"kind": "SendActivity", "id": "then_action", "activity": {"text": "then"}},
],
"else": [
{"kind": "SendActivity", "id": "else_action", "activity": {"text": "else"}},
],
},
],
},
],
}
workflow = DeclarativeWorkflowBuilder(yaml_def).build()
edges = {(e.source_id, e.target_id) for group in workflow.edge_groups for e in group.edges}
assert ("then_action", "loop_next") in edges
assert ("else_action", "loop_next") in edges
assert ("step_1", "loop_next") not in edges
def test_wire_to_target_with_if_structure(self):
"""Test wiring to an If structure routes to evaluator."""
from agent_framework import WorkflowBuilder
@@ -121,6 +121,35 @@ class TestGraphBasedWorkflowExecution:
assert "b" in outputs
assert "c" in outputs
@pytest.mark.asyncio
async def test_foreach_multi_action_body_runs_sequentially(self):
"""Body actions must complete per item before advancing."""
yaml_def = {
"name": "loop_sequential_body",
"actions": [
{"kind": "SetValue", "id": "set_items", "path": "Local.items", "value": ["A", "B"]},
{
"kind": "Foreach",
"id": "loop",
"itemsSource": "=Local.items",
"iteratorVariable": "Local.item",
"actions": [
{"kind": "SendActivity", "id": "step_1", "activity": {"text": '="1-" & Local.item'}},
{"kind": "SendActivity", "id": "step_2", "activity": {"text": '="2-" & Local.item'}},
{"kind": "SendActivity", "id": "step_3", "activity": {"text": '="3-" & Local.item'}},
],
},
],
}
builder = DeclarativeWorkflowBuilder(yaml_def)
workflow = builder.build()
events = await workflow.run(ActionTrigger())
outputs = events.get_outputs()
assert outputs == ["1-A", "2-A", "3-A", "1-B", "2-B", "3-B"]
@pytest.mark.asyncio
async def test_workflow_with_switch(self):
"""Test workflow with Switch/ConditionGroup."""