Python: [Breaking] Remove Python-only declarative actions and rename alias kinds to C# canonical names (#6126)

* Remove Python-only declarative actions and rename alias kinds to C# canonical names

* Address PR comments.

* Address PR comments.

* Reduce verbose and duplicate output from sample workflow.
This commit is contained in:
Peter Ibekwe
2026-05-28 03:16:22 -07:00
committed by GitHub
Unverified
parent 55dc3ce734
commit ded17b178c
15 changed files with 577 additions and 771 deletions
@@ -38,10 +38,8 @@ from ._executors_agents import (
)
from ._executors_basic import (
BASIC_ACTION_EXECUTORS,
AppendValueExecutor,
ClearAllVariablesExecutor,
CreateConversationExecutor,
EmitEventExecutor,
ResetVariableExecutor,
SendActivityExecutor,
SetMultipleVariablesExecutor,
@@ -61,12 +59,10 @@ from ._executors_control_flow import (
)
from ._executors_external_input import (
EXTERNAL_INPUT_EXECUTORS,
ConfirmationExecutor,
ExternalInputRequest,
ExternalInputResponse,
QuestionExecutor,
RequestExternalInputExecutor,
WaitForInputExecutor,
)
from ._executors_http import (
HTTP_ACTION_EXECUTORS,
@@ -122,11 +118,9 @@ __all__ = [
"AgentExternalInputRequest",
"AgentExternalInputResponse",
"AgentResult",
"AppendValueExecutor",
"BaseToolExecutor",
"BreakLoopExecutor",
"ClearAllVariablesExecutor",
"ConfirmationExecutor",
"ContinueLoopExecutor",
"ConversationData",
"CreateConversationExecutor",
@@ -139,7 +133,6 @@ __all__ = [
"DeclarativeWorkflowState",
"DefaultHttpRequestHandler",
"DefaultMCPToolHandler",
"EmitEventExecutor",
"EndConversationExecutor",
"EndWorkflowExecutor",
"ExternalInputRequest",
@@ -173,7 +166,6 @@ __all__ = [
"ToolApprovalResponse",
"ToolApprovalState",
"ToolInvocationResult",
"WaitForInputExecutor",
"WorkflowFactory",
"WorkflowState",
]
@@ -915,9 +915,9 @@ class ActionComplete:
@dataclass
class ConditionResult:
"""Result of evaluating a condition (If/Switch).
"""Result of evaluating a condition (If/ConditionGroup).
This message is output by ConditionEvaluatorExecutor and SwitchEvaluatorExecutor
This message is output by ConditionEvaluatorExecutor and ConditionGroupEvaluatorExecutor
to indicate which branch should be taken.
"""
@@ -7,7 +7,7 @@ This module provides the DeclarativeWorkflowBuilder which is analogous to
action definitions and creates a proper workflow graph with:
- Executor nodes for each action
- Edges for sequential flow
- Condition evaluator executors for If/Switch that ensure first-match semantics
- Condition evaluator executors for If/ConditionGroup that ensure first-match semantics
- Loop edges for foreach
"""
@@ -38,7 +38,6 @@ from ._executors_control_flow import (
ForeachNextExecutor,
IfConditionEvaluatorExecutor,
JoinExecutor,
SwitchEvaluatorExecutor,
)
from ._executors_external_input import EXTERNAL_INPUT_EXECUTORS
from ._executors_http import HTTP_ACTION_EXECUTORS, HttpRequestActionExecutor
@@ -64,7 +63,6 @@ ALL_ACTION_EXECUTORS = {
# Action kinds that terminate control flow (no fall-through to successor)
# These actions transfer control elsewhere and should not have sequential edges to the next action
TERMINATOR_ACTIONS = frozenset({
"Goto",
"GotoAction",
"BreakLoop",
"ContinueLoop",
@@ -80,18 +78,16 @@ TERMINATOR_ACTIONS = frozenset({
ACTION_REQUIRED_FIELDS: dict[str, list[str]] = {
"SetValue": ["path"],
"SetVariable": ["variable"],
"AppendValue": ["path", "value"],
"SendActivity": ["activity"],
"InvokeAzureAgent": ["agent"],
"Goto": ["target"],
"GotoAction": ["actionId"],
"Foreach": ["items", "actions"],
"Foreach": ["source", "actions"],
"If": ["condition"],
"Switch": ["value"], # Switch can use value/cases or conditions (ConditionGroup style)
"ConditionGroup": ["conditions"],
"Question": ["question", "variable"],
"RequestExternalInput": ["prompt", "variable"],
"RequestHumanInput": ["variable"],
"WaitForHumanInput": ["variable"],
"EmitEvent": ["event"],
"InvokeFunctionTool": ["functionName"],
"HttpRequestAction": ["url"],
"InvokeMcpTool": ["serverUrl", "toolName"],
@@ -101,11 +97,14 @@ ACTION_REQUIRED_FIELDS: dict[str, list[str]] = {
# Key: "ActionKind.field", Value: list of alternates that satisfy the requirement
ACTION_ALTERNATE_FIELDS: dict[str, list[str]] = {
"SetValue.path": ["variable"],
"Goto.target": ["actionId"],
"GotoAction.actionId": ["target"],
"InvokeAzureAgent.agent": ["agentName"],
"Foreach.items": ["itemsSource", "source"], # source is used in some schemas
"Switch.value": ["conditions"], # Switch can be condition-based instead of value-based
# Top-level alternates that satisfy the nested-shape requirements without forcing
# callers to spell every field in its long form.
"Question.question": ["text"],
"Question.variable": ["property"],
"RequestExternalInput.prompt": ["message"],
"RequestExternalInput.variable": ["property"],
}
@@ -115,9 +114,9 @@ class DeclarativeWorkflowBuilder:
This builder transforms declarative action definitions into a proper
workflow graph with executor nodes and edges. It handles:
- Sequential actions (simple edges)
- Conditional branching (If/Switch with condition edges)
- Conditional branching (If/ConditionGroup with condition edges)
- Loops (Foreach with loop edges)
- Jumps (Goto with target edges)
- Jumps (GotoAction with target edges)
Example usage:
yaml_def = {
@@ -299,7 +298,7 @@ class DeclarativeWorkflowBuilder:
raise ValueError(f"Action '{kind}' is missing required field '{field}'. Action: {action_def}")
# Collect goto targets for circular reference detection
if kind in ("Goto", "GotoAction"):
if kind == "GotoAction":
target = action_def.get("target") or action_def.get("actionId")
if target:
goto_targets.append((target, explicit_id))
@@ -313,13 +312,19 @@ class DeclarativeWorkflowBuilder:
if else_actions:
self._validate_actions_recursive(else_actions, seen_ids, goto_targets, defined_ids)
elif kind in ("Switch", "ConditionGroup"):
cases = action_def.get("cases", action_def.get("conditions", []))
for case in cases:
case_actions = case.get("actions", [])
if case_actions:
self._validate_actions_recursive(case_actions, seen_ids, goto_targets, defined_ids)
else_actions = action_def.get("elseActions", action_def.get("else", action_def.get("default", [])))
elif kind == "ConditionGroup":
for forbidden in ("else", "default"):
if forbidden in action_def:
raise ValueError(
f"Action 'ConditionGroup' field '{forbidden}' is not supported; "
"use 'elseActions' instead."
)
conditions = action_def.get("conditions", [])
for condition_branch in conditions:
branch_actions = condition_branch.get("actions", [])
if branch_actions:
self._validate_actions_recursive(branch_actions, seen_ids, goto_targets, defined_ids)
else_actions = action_def.get("elseActions", [])
if else_actions:
self._validate_actions_recursive(else_actions, seen_ids, goto_targets, defined_ids)
@@ -362,7 +367,8 @@ class DeclarativeWorkflowBuilder:
# Check for direct self-reference
if source_id and target_id == source_id:
raise ValueError(
f"Action '{source_id}' has a direct self-referencing Goto, which would cause an infinite loop."
f"Action '{source_id}' has a direct self-referencing GotoAction, "
"which would cause an infinite loop."
)
def _resolve_pending_gotos(self, builder: WorkflowBuilder) -> None:
@@ -380,7 +386,7 @@ class DeclarativeWorkflowBuilder:
builder.add_edge(source=goto_executor, target=target_executor)
else:
available_ids = list(self._executors.keys())
raise ValueError(f"Goto target '{target_id}' not found. Available action IDs: {available_ids}")
raise ValueError(f"GotoAction target '{target_id}' not found. Available action IDs: {available_ids}")
def _create_executors_for_actions(
self,
@@ -453,11 +459,11 @@ class DeclarativeWorkflowBuilder:
# Handle special control flow actions
if kind == "If":
return self._create_if_structure(action_def, builder, parent_context)
if kind == "Switch" or kind == "ConditionGroup":
return self._create_switch_structure(action_def, builder, parent_context)
if kind == "ConditionGroup":
return self._create_condition_group_structure(action_def, builder, parent_context)
if kind == "Foreach":
return self._create_foreach_structure(action_def, builder, parent_context)
if kind == "Goto" or kind == "GotoAction":
if kind == "GotoAction":
return self._create_goto_reference(action_def, builder, parent_context)
if kind == "BreakLoop":
return self._create_break_executor(action_def, builder, parent_context)
@@ -588,7 +594,7 @@ class DeclarativeWorkflowBuilder:
# Wire evaluator to branches with conditions that check ConditionResult.branch_index
# branch_index=0 means "then" branch, branch_index=-1 (ELSE_BRANCH_INDEX) means "else"
# For nested If/Switch structures, wire to the evaluator (entry point)
# For nested If/ConditionGroup structures, wire to the evaluator (entry point)
if then_entry:
then_target = self._get_structure_entry(then_entry)
builder.add_edge(
@@ -634,66 +640,42 @@ class DeclarativeWorkflowBuilder:
return IfStructure()
def _create_switch_structure(
def _create_condition_group_structure(
self,
action_def: dict[str, Any],
builder: WorkflowBuilder,
parent_context: dict[str, Any] | None = None,
) -> Any:
"""Create the graph structure for a Switch/ConditionGroup action.
"""Create the graph structure for a ConditionGroup action.
Supports two schema formats:
1. ConditionGroup schema (matches .NET):
- conditions: list of {condition: expr, actions: [...]}
- elseActions: default actions
2. Switch schema (interpreter style):
- value: expression to match
- cases: list of {match: value, actions: [...]}
- default: default actions
Both use evaluator executors that output ConditionResult with branch_index
for first-match semantics.
Evaluates the action's ``conditions`` in order; the first match
selects its ``actions`` branch. If none match, ``elseActions`` runs.
The structure exposes an evaluator entry point and the per-branch
entry/exit pairs used by the caller to wire downstream edges.
Args:
action_def: The Switch/ConditionGroup action definition
action_def: The ConditionGroup action definition
builder: The workflow builder
parent_context: Context from parent
Returns:
A SwitchStructure containing branch info for wiring
A ConditionGroupStructure containing branch info for wiring
"""
action_id = action_def.get("id") or f"Switch_{self._action_index}"
action_id = action_def.get("id") or f"ConditionGroup_{self._action_index}"
self._action_index += 1
# Pass the Switch's ID as context for child action naming
# Pass the ConditionGroup's ID as context for child action naming
branch_context = {
**(parent_context or {}),
"parent_id": action_id,
}
# Detect schema type:
# - If "cases" present: interpreter Switch schema (value/cases/default)
# - If "conditions" present: ConditionGroup schema (conditions/elseActions)
cases = action_def.get("cases", [])
conditions = action_def.get("conditions", [])
if cases:
# Interpreter Switch schema: value/cases/default
evaluator: DeclarativeActionExecutor = SwitchEvaluatorExecutor(
action_def,
cases,
id=f"{action_id}_eval",
)
branch_items = cases
else:
# ConditionGroup schema: conditions/elseActions
evaluator = ConditionGroupEvaluatorExecutor(
action_def,
conditions,
id=f"{action_id}_eval",
)
branch_items = conditions
evaluator: DeclarativeActionExecutor = ConditionGroupEvaluatorExecutor(
action_def,
conditions,
id=f"{action_id}_eval",
)
self._executors[evaluator.id] = evaluator
@@ -701,7 +683,7 @@ class DeclarativeWorkflowBuilder:
branch_entries: list[tuple[int, Any]] = [] # (branch_index, entry_executor)
branch_exits: list[Any] = [] # All exits that need wiring to successor
for i, item in enumerate(branch_items):
for i, item in enumerate(conditions):
branch_actions = item.get("actions", [])
# Use branch-specific context
case_context = {**branch_context, "parent_id": f"{action_id}_case{i}"}
@@ -714,9 +696,7 @@ class DeclarativeWorkflowBuilder:
if branch_exit:
branch_exits.append(branch_exit)
# Handle else/default branch
# .NET uses "elseActions", interpreter uses "else" or "default"
else_actions = action_def.get("elseActions", action_def.get("else", action_def.get("default", [])))
else_actions = action_def.get("elseActions", [])
default_entry = None
default_passthrough = None
if else_actions:
@@ -734,7 +714,7 @@ class DeclarativeWorkflowBuilder:
branch_exits.append(default_passthrough)
# Wire evaluator to branches with conditions that check ConditionResult.branch_index
# For nested If/Switch structures, wire to the evaluator (entry point)
# For nested If/ConditionGroup structures, wire to the evaluator (entry point)
for branch_index, branch_entry in branch_entries:
# Capture branch_index in closure properly using a factory function for type inference
def make_branch_condition(expected: int) -> Any:
@@ -762,8 +742,8 @@ class DeclarativeWorkflowBuilder:
condition=lambda msg: isinstance(msg, ConditionResult) and msg.branch_index == ELSE_BRANCH_INDEX,
)
# Create a SwitchStructure to hold all the info needed for wiring
class SwitchStructure:
# Create a ConditionGroupStructure to hold all the info needed for wiring
class ConditionGroupStructure:
def __init__(self) -> None:
self.id = action_id
self.evaluator = evaluator # The entry point for this structure
@@ -771,9 +751,9 @@ class DeclarativeWorkflowBuilder:
self.default_entry = default_entry
self.default_passthrough = default_passthrough
self.branch_exits = branch_exits # All exits that need wiring to successor
self._is_switch_structure = True
self._is_condition_group_structure = True
return SwitchStructure()
return ConditionGroupStructure()
def _create_foreach_structure(
self,
@@ -823,7 +803,7 @@ class DeclarativeWorkflowBuilder:
body_entry = self._create_executors_for_actions(body_actions, builder, loop_context)
if body_entry:
# For nested If/Switch structures, wire to the evaluator (entry point)
# For nested If/ConditionGroup structures, wire to the evaluator (entry point)
body_target = self._get_structure_entry(body_entry)
# Init -> body (when has_next=True)
@@ -835,7 +815,7 @@ class DeclarativeWorkflowBuilder:
# Wire from the LAST body action so the loop only advances after the
# whole body completes. _get_branch_exit walks the chain, skips
# terminators (Break/Continue), and returns nested If/Switch
# terminators (Break/Continue), and returns nested If/ConditionGroup
# structures so _get_source_exits can flatten their branch exits.
body_exit = self._get_branch_exit(body_entry)
if body_exit is not None:
@@ -963,8 +943,8 @@ class DeclarativeWorkflowBuilder:
"""Add a sequential edge between two executors.
Handles control flow structures:
- If source is a structure (If/Switch), wire from all branch exits
- If target is a structure (If/Switch), wire with conditional edges to branches
- If source is a structure (If/ConditionGroup), wire from all branch exits
- If target is a structure (If/ConditionGroup), wire with conditional edges to branches
"""
# Get all source exit points
source_exits = self._get_source_exits(source)
@@ -999,12 +979,12 @@ class DeclarativeWorkflowBuilder:
) -> None:
"""Wire a single source executor to a target (which may be a structure).
For If/Switch structures, wire to the evaluator executor. The evaluator
For If/ConditionGroup structures, wire to the evaluator executor. The evaluator
handles condition evaluation and outputs ConditionResult, which is then
routed to the appropriate branch by edges created in _create_*_structure.
"""
# Check if target is an IfStructure or SwitchStructure (wire to evaluator)
if getattr(target, "_is_if_structure", False) or getattr(target, "_is_switch_structure", False):
# Check if target is an IfStructure or ConditionGroupStructure (wire to evaluator)
if getattr(target, "_is_if_structure", False) or getattr(target, "_is_condition_group_structure", False):
# Wire from source to the evaluator - the evaluator then routes to branches
builder.add_edge(source=source, target=target.evaluator)
@@ -1015,7 +995,7 @@ class DeclarativeWorkflowBuilder:
def _get_structure_entry(self, entry: Any) -> Any:
"""Get the entry point executor for a structure or regular executor.
For If/Switch structures, returns the evaluator. For regular executors,
For If/ConditionGroup structures, returns the evaluator. For regular executors,
returns the executor itself.
Args:
@@ -1024,14 +1004,16 @@ class DeclarativeWorkflowBuilder:
Returns:
The entry point executor
"""
is_structure = getattr(entry, "_is_if_structure", False) or getattr(entry, "_is_switch_structure", False)
is_structure = getattr(entry, "_is_if_structure", False) or getattr(
entry, "_is_condition_group_structure", False
)
return entry.evaluator if is_structure else entry
def _get_branch_exit(self, branch_entry: Any) -> Any | None:
"""Get the exit point of a branch for downstream wiring.
Returns the last executor (or its ``_exit_executor``) for a linear chain,
the nested If/Switch structure itself when the chain ends in one (so
the nested If/ConditionGroup structure itself when the chain ends in one (so
callers can flatten ``branch_exits`` via :meth:`_get_source_exits`), or
``None`` when the branch is empty or ends in a terminator action.
"""
@@ -179,28 +179,6 @@ class SetMultipleVariablesExecutor(DeclarativeActionExecutor):
await ctx.send_message(ActionComplete())
class AppendValueExecutor(DeclarativeActionExecutor):
"""Executor for the AppendValue action."""
@handler
async def handle_action(
self,
trigger: Any,
ctx: WorkflowContext[ActionComplete],
) -> None:
"""Handle the AppendValue action."""
state = await self._ensure_state_initialized(ctx, trigger)
path = self._action_def.get("path")
value = self._action_def.get("value")
if path:
evaluated_value = state.eval_if_expression(value)
state.append(path, evaluated_value)
await ctx.send_message(ActionComplete())
class ResetVariableExecutor(DeclarativeActionExecutor):
"""Executor for the ResetVariable action."""
@@ -279,47 +257,6 @@ class SendActivityExecutor(DeclarativeActionExecutor):
await ctx.send_message(ActionComplete())
class EmitEventExecutor(DeclarativeActionExecutor):
"""Executor for the EmitEvent action.
Emits a custom event to the workflow event stream.
Supports two schema formats:
1. Graph mode: eventName, eventValue
2. Interpreter mode: event.name, event.data
"""
@handler
async def handle_action(
self,
trigger: Any,
ctx: WorkflowContext[ActionComplete, dict[str, Any]],
) -> None:
"""Handle the EmitEvent action."""
state = await self._ensure_state_initialized(ctx, trigger)
# Support both schema formats:
# - Graph mode: eventName, eventValue
# - Interpreter mode: event.name, event.data
event_def = self._action_def.get("event", {})
event_name = self._action_def.get("eventName") or event_def.get("name", "")
event_value = self._action_def.get("eventValue")
if event_value is None:
event_value = event_def.get("data")
if event_name:
evaluated_name = state.eval_if_expression(event_name)
evaluated_value = state.eval_if_expression(event_value)
event_data = {
"eventName": evaluated_name,
"eventValue": evaluated_value,
}
await ctx.yield_output(event_data)
await ctx.send_message(ActionComplete())
class EditTableExecutor(DeclarativeActionExecutor):
"""Executor for the EditTable action.
@@ -628,11 +565,9 @@ BASIC_ACTION_EXECUTORS: dict[str, type[DeclarativeActionExecutor]] = {
"SetVariable": SetVariableExecutor,
"SetTextVariable": SetTextVariableExecutor,
"SetMultipleVariables": SetMultipleVariablesExecutor,
"AppendValue": AppendValueExecutor,
"ResetVariable": ResetVariableExecutor,
"ClearAllVariables": ClearAllVariablesExecutor,
"SendActivity": SendActivityExecutor,
"EmitEvent": EmitEventExecutor,
"ParseValue": ParseValueExecutor,
"EditTable": EditTableExecutor,
"EditTableV2": EditTableV2Executor,
@@ -3,7 +3,7 @@
"""Control flow executors for the graph-based declarative workflow system.
Control flow in the graph-based system is handled differently than the interpreter:
- If/Switch: Condition evaluation happens in a dedicated evaluator executor that
- If/ConditionGroup: Condition evaluation happens in a dedicated evaluator executor that
returns a ConditionResult with the first-matching branch index. Edge conditions
then check the branch_index to route to the correct branch. This ensures only
one branch executes (first-match semantics), matching the interpreter behavior.
@@ -39,7 +39,7 @@ ELSE_BRANCH_INDEX = -1
class ConditionGroupEvaluatorExecutor(DeclarativeActionExecutor):
"""Evaluates conditions for ConditionGroup/Switch and outputs the first-matching branch.
"""Evaluates conditions for ConditionGroup and outputs the first-matching branch.
This executor implements first-match semantics by evaluating conditions sequentially
and outputting a ConditionResult with the index of the first matching branch.
@@ -59,7 +59,7 @@ class ConditionGroupEvaluatorExecutor(DeclarativeActionExecutor):
"""Initialize the condition evaluator.
Args:
action_def: The ConditionGroup/Switch action definition
action_def: The ConditionGroup action definition
conditions: List of condition items, each with 'condition' and optional 'id'
id: Optional executor ID
"""
@@ -99,71 +99,6 @@ class ConditionGroupEvaluatorExecutor(DeclarativeActionExecutor):
await ctx.send_message(ConditionResult(matched=False, branch_index=ELSE_BRANCH_INDEX))
class SwitchEvaluatorExecutor(DeclarativeActionExecutor):
"""Evaluates a Switch action by matching a value against cases.
The Switch action uses a different schema than ConditionGroup:
- value: expression to evaluate once
- cases: list of {match: value_to_match, actions: [...]}
- default: default actions if no case matches
This evaluator evaluates the value expression once, then compares it
against each case's match value sequentially. First match wins.
"""
def __init__(
self,
action_def: dict[str, Any],
cases: list[dict[str, Any]],
*,
id: str | None = None,
):
"""Initialize the switch evaluator.
Args:
action_def: The Switch action definition (contains 'value' expression)
cases: List of case items, each with 'match' and optional 'actions'
id: Optional executor ID
"""
super().__init__(action_def, id=id)
self._cases = cases
@handler
async def handle_action(
self,
trigger: Any,
ctx: WorkflowContext[ConditionResult],
) -> None:
"""Evaluate the switch value and find the first matching case."""
state = await self._ensure_state_initialized(ctx, trigger)
value_expr = self._action_def.get("value")
if not value_expr:
# No value to switch on - use default
await ctx.send_message(ConditionResult(matched=False, branch_index=ELSE_BRANCH_INDEX))
return
# Evaluate the switch value once
switch_value = state.eval_if_expression(value_expr)
# Compare against each case's match value
for index, case_item in enumerate(self._cases):
match_expr = case_item.get("match")
if match_expr is None:
continue
# Evaluate the match value
match_value = state.eval_if_expression(match_expr)
if switch_value == match_value:
# Found matching case
await ctx.send_message(ConditionResult(matched=True, branch_index=index, value=switch_value))
return
# No case matched - use default branch
await ctx.send_message(ConditionResult(matched=False, branch_index=ELSE_BRANCH_INDEX))
class IfConditionEvaluatorExecutor(DeclarativeActionExecutor):
"""Evaluates a single If condition and outputs a ConditionResult.
@@ -221,12 +156,7 @@ class ForeachInitExecutor(DeclarativeActionExecutor):
"""Initialize the loop and check for first item."""
state = await self._ensure_state_initialized(ctx, trigger)
# Support multiple schema formats:
# - Graph mode: itemsSource, items
# - Interpreter mode: source
items_expr = (
self._action_def.get("itemsSource") or self._action_def.get("items") or self._action_def.get("source")
)
items_expr = self._action_def.get("source")
items_raw: Any = state.eval_if_expression(items_expr) or []
items: list[Any]
@@ -244,25 +174,12 @@ class ForeachInitExecutor(DeclarativeActionExecutor):
}
state.set_state_data(state_data)
# Check if we have items
if items:
# Set the iteration variable
# Support multiple schema formats:
# - Graph mode: iteratorVariable, item (default "Local.item")
# - Interpreter mode: itemName (default "item", stored in Local scope)
item_var = self._action_def.get("iteratorVariable") or self._action_def.get("item")
if not item_var:
# Interpreter mode: itemName defaults to "item", store in Local scope
item_name = self._action_def.get("itemName", "item")
item_var = f"Local.{item_name}"
# Support multiple schema formats for index:
# - Graph mode: indexVariable, index
# - Interpreter mode: indexName (default "index", stored in Local scope)
index_var = self._action_def.get("indexVariable") or self._action_def.get("index")
if not index_var and "indexName" in self._action_def:
index_name = self._action_def.get("indexName", "index")
index_var = f"Local.{index_name}"
# Bind the current item and (when requested) the index under the Local scope.
item_var = f"Local.{self._action_def.get('itemName', 'item')}"
index_var = (
f"Local.{self._action_def.get('indexName', 'index')}" if "indexName" in self._action_def else None
)
state.set(item_var, items[0])
if index_var:
@@ -325,23 +242,11 @@ class ForeachNextExecutor(DeclarativeActionExecutor):
loop_state["index"] = current_index
state.set_state_data(state_data)
# Set the iteration variable
# Support multiple schema formats:
# - Graph mode: iteratorVariable, item (default "Local.item")
# - Interpreter mode: itemName (default "item", stored in Local scope)
item_var = self._action_def.get("iteratorVariable") or self._action_def.get("item")
if not item_var:
# Interpreter mode: itemName defaults to "item", store in Local scope
item_name = self._action_def.get("itemName", "item")
item_var = f"Local.{item_name}"
# Support multiple schema formats for index:
# - Graph mode: indexVariable, index
# - Interpreter mode: indexName (default "index", stored in Local scope)
index_var = self._action_def.get("indexVariable") or self._action_def.get("index")
if not index_var and "indexName" in self._action_def:
index_name = self._action_def.get("indexName", "index")
index_var = f"Local.{index_name}"
# Rebind the current item and (when requested) the index under the Local scope.
item_var = f"Local.{self._action_def.get('itemName', 'item')}"
index_var = (
f"Local.{self._action_def.get('indexName', 'index')}" if "indexName" in self._action_def else None
)
state.set(item_var, items[current_index])
if index_var:
@@ -486,7 +391,7 @@ class EndConversationExecutor(DeclarativeActionExecutor):
class JoinExecutor(DeclarativeActionExecutor):
"""Executor that joins multiple branches back together.
Used after If/Switch to merge control flow back to a single path.
Used after If/ConditionGroup to merge control flow back to a single path.
Also used as passthrough nodes for else/default branches.
"""
@@ -2,14 +2,14 @@
"""External input executors for declarative workflows.
These executors handle interactions that require external input (user questions,
confirmations, etc.), using the request_info pattern to pause the workflow and
wait for responses.
These executors handle interactions that require external input (user questions
and external integrations), using the request_info pattern to pause the workflow
and wait for responses.
"""
import uuid
from dataclasses import dataclass, field
from typing import Any
from typing import Any, cast
from agent_framework import (
WorkflowContext,
@@ -23,18 +23,49 @@ from ._declarative_base import (
)
def _get_prompt_text(action_def: dict[str, Any], primary_key: str, fallback_key: str) -> Any:
"""Return the prompt text from an action definition.
Accepts a nested ``{primary_key: {"text": ...}}`` mapping, a bare
string under ``primary_key``, or a top-level ``fallback_key`` value.
"""
match action_def.get(primary_key):
case {"text": text}:
return text
case str() as text:
return text
case _:
return action_def.get(fallback_key, "")
def _get_output_path(action_def: dict[str, Any], default: str) -> str:
"""Return the state path where the action result should be written.
Looks at ``variable``, then ``output.property``, then top-level
``property``, falling back to ``default``.
"""
output = action_def.get("output")
nested = cast(dict[str, Any], output).get("property") if isinstance(output, dict) else None
return action_def.get("variable") or nested or action_def.get("property") or default
@dataclass
class ExternalInputRequest:
"""Request for external input (triggers workflow pause).
Aligns with .NET ExternalInputRequest pattern. Used by Question, Confirmation,
WaitForInput, and RequestExternalInput executors to signal that user input is
needed. The workflow will pause via request_info and wait for an ExternalInputResponse.
Aligns with .NET ExternalInputRequest pattern. Used by Question and
RequestExternalInput executors to signal that user input is needed.
The workflow will pause via request_info and wait for an ExternalInputResponse.
Attributes:
request_id: Unique identifier for this request.
message: The prompt or question to display to the user.
request_type: Type of input requested (question, confirmation, user_input, external).
request_type: A free-form discriminator describing the kind of input
being requested. ``QuestionExecutor`` emits ``"question"`` and
``RequestExternalInputExecutor`` defaults to ``"external"``; callers
may supply any other string via the ``requestType`` field on a
``RequestExternalInput`` action (e.g. ``"approval"``) and it is
propagated unchanged.
metadata: Additional context (choices, output_property, timeout, etc.).
"""
@@ -75,15 +106,12 @@ class QuestionExecutor(DeclarativeActionExecutor):
"""Ask the question and wait for a response."""
state = await self._ensure_state_initialized(ctx, trigger)
question_text = self._action_def.get("text") or self._action_def.get("question", "")
output_property = self._action_def.get("output", {}).get("property") or self._action_def.get(
"property", "Local.answer"
)
question_text = _get_prompt_text(self._action_def, primary_key="question", fallback_key="text")
output_property = _get_output_path(self._action_def, default="Local.answer")
default_value = self._action_def.get("default", self._action_def.get("defaultValue"))
choices = self._action_def.get("choices", [])
default_value = self._action_def.get("defaultValue")
allow_free_text = self._action_def.get("allowFreeText", True)
# Evaluate the question text if it's an expression
evaluated_question = state.eval_if_expression(question_text)
# Build choices metadata
@@ -139,133 +167,6 @@ class QuestionExecutor(DeclarativeActionExecutor):
await ctx.send_message(ActionComplete())
class ConfirmationExecutor(DeclarativeActionExecutor):
"""Executor that asks for a yes/no confirmation.
A specialized version of Question that expects a boolean response.
"""
@handler
async def handle_action(
self,
trigger: Any,
ctx: WorkflowContext[ActionComplete],
) -> None:
"""Ask for confirmation."""
state = await self._ensure_state_initialized(ctx, trigger)
message = self._action_def.get("text") or self._action_def.get("message", "")
output_property = self._action_def.get("output", {}).get("property") or self._action_def.get(
"property", "Local.confirmed"
)
yes_label = self._action_def.get("yesLabel", "Yes")
no_label = self._action_def.get("noLabel", "No")
default_value = self._action_def.get("defaultValue", False)
# Evaluate the message if it's an expression
evaluated_message = state.eval_if_expression(message)
# Request confirmation - workflow pauses here
await ctx.request_info(
ExternalInputRequest(
request_id=str(uuid.uuid4()),
message=str(evaluated_message),
request_type="confirmation",
metadata={
"output_property": output_property,
"yes_label": yes_label,
"no_label": no_label,
"default_value": default_value,
},
),
ExternalInputResponse,
)
@response_handler
async def handle_response(
self,
original_request: ExternalInputRequest,
response: ExternalInputResponse,
ctx: WorkflowContext[ActionComplete],
) -> None:
"""Handle the user's confirmation response."""
state = self._get_state(ctx.state)
output_property = original_request.metadata.get("output_property", "Local.confirmed")
# Convert response to boolean
if response.value is not None:
confirmed = bool(response.value)
else:
# Interpret common affirmative responses
user_input_lower = response.user_input.lower().strip()
confirmed = user_input_lower in ("yes", "y", "true", "1", "confirm", "ok")
if output_property:
state.set(output_property, confirmed)
await ctx.send_message(ActionComplete())
class WaitForInputExecutor(DeclarativeActionExecutor):
"""Executor that waits for user input during a conversation.
Used when the workflow needs to pause and wait for the next user message
in a conversational flow.
"""
@handler
async def handle_action(
self,
trigger: Any,
ctx: WorkflowContext[ActionComplete, str],
) -> None:
"""Wait for user input."""
state = await self._ensure_state_initialized(ctx, trigger)
prompt = self._action_def.get("prompt")
output_property = self._action_def.get("output", {}).get("property") or self._action_def.get(
"property", "Local.input"
)
timeout_seconds = self._action_def.get("timeout")
# Emit prompt if specified
if prompt:
evaluated_prompt = state.eval_if_expression(prompt)
await ctx.yield_output(str(evaluated_prompt))
# Request user input - workflow pauses here
await ctx.request_info(
ExternalInputRequest(
request_id=str(uuid.uuid4()),
message=str(prompt) if prompt else "Waiting for input...",
request_type="user_input",
metadata={
"output_property": output_property,
"timeout_seconds": timeout_seconds,
},
),
ExternalInputResponse,
)
@response_handler
async def handle_response(
self,
original_request: ExternalInputRequest,
response: ExternalInputResponse,
ctx: WorkflowContext[ActionComplete, str],
) -> None:
"""Handle the user's input."""
state = self._get_state(ctx.state)
output_property = original_request.metadata.get("output_property", "Local.input")
if output_property:
state.set(output_property, response.user_input)
await ctx.send_message(ActionComplete())
class RequestExternalInputExecutor(DeclarativeActionExecutor):
"""Executor that requests external input/approval.
@@ -282,16 +183,15 @@ class RequestExternalInputExecutor(DeclarativeActionExecutor):
"""Request external input."""
state = await self._ensure_state_initialized(ctx, trigger)
message = _get_prompt_text(self._action_def, primary_key="prompt", fallback_key="message")
output_property = _get_output_path(self._action_def, default="Local.externalInput")
default_value = self._action_def.get("default")
request_type = self._action_def.get("requestType", "external")
message = self._action_def.get("message", "")
output_property = self._action_def.get("output", {}).get("property") or self._action_def.get(
"property", "Local.externalInput"
)
timeout_seconds = self._action_def.get("timeout")
required_fields = self._action_def.get("requiredFields", [])
metadata = self._action_def.get("metadata", {})
# Evaluate the message if it's an expression
evaluated_message = state.eval_if_expression(message)
# Build request metadata
@@ -299,6 +199,7 @@ class RequestExternalInputExecutor(DeclarativeActionExecutor):
**metadata,
"output_property": output_property,
"required_fields": required_fields,
"default_value": default_value,
}
if timeout_seconds:
@@ -338,7 +239,5 @@ class RequestExternalInputExecutor(DeclarativeActionExecutor):
# Mapping of external input action kinds to executor classes
EXTERNAL_INPUT_EXECUTORS: dict[str, type[DeclarativeActionExecutor]] = {
"Question": QuestionExecutor,
"Confirmation": ConfirmationExecutor,
"WaitForInput": WaitForInputExecutor,
"RequestExternalInput": RequestExternalInputExecutor,
}