Python: Add factory pattern to sequential orchestration builder (#2710)

* Add factory pattern to sequential orchestration builder

* Use temp list to avoid override

* Add sample and some other fixes

* Fix comments

* Small fix

* Update readme
This commit is contained in:
Tao Chen
2025-12-10 10:16:10 -08:00
committed by GitHub
Unverified
parent 3f4eeb00be
commit 523305ac62
6 changed files with 413 additions and 84 deletions
@@ -37,7 +37,7 @@ confusion and to mirror how the concurrent builder uses explicit dispatcher/aggr
""" # noqa: E501
import logging
from collections.abc import Sequence
from collections.abc import Callable, Sequence
from typing import Any
from agent_framework import AgentProtocol, ChatMessage
@@ -72,11 +72,7 @@ class _InputToConversation(Executor):
await ctx.send_message(normalize_messages_input(message))
@handler
async def from_messages(
self,
messages: list[str | ChatMessage],
ctx: WorkflowContext[list[ChatMessage]],
) -> None:
async def from_messages(self, messages: list[str | ChatMessage], ctx: WorkflowContext[list[ChatMessage]]) -> None:
await ctx.send_message(normalize_messages_input(messages))
@@ -102,7 +98,10 @@ class _EndWithConversation(Executor):
class SequentialBuilder:
r"""High-level builder for sequential agent/executor workflows with shared context.
- `participants([...])` accepts a list of AgentProtocol (recommended) or Executor
- `participants([...])` accepts a list of AgentProtocol (recommended) or Executor instances
- `register_participants([...])` accepts a list of factories for AgentProtocol (recommended)
or Executor factories
- Executors must define a handler that consumes list[ChatMessage] and sends out a list[ChatMessage]
- The workflow wires participants in order, passing a list[ChatMessage] down the chain
- Agents append their assistant messages to the conversation
- Custom executors can transform/summarize and return a list[ChatMessage]
@@ -114,8 +113,14 @@ class SequentialBuilder:
from agent_framework import SequentialBuilder
# With agent instances
workflow = SequentialBuilder().participants([agent1, agent2, summarizer_exec]).build()
# With agent factories
workflow = (
SequentialBuilder().register_participants([create_agent1, create_agent2, create_summarizer_exec]).build()
)
# Enable checkpoint persistence
workflow = SequentialBuilder().participants([agent1, agent2]).with_checkpointing(storage).build()
@@ -133,16 +138,38 @@ class SequentialBuilder:
def __init__(self) -> None:
self._participants: list[AgentProtocol | Executor] = []
self._participant_factories: list[Callable[[], AgentProtocol | Executor]] = []
self._checkpoint_storage: CheckpointStorage | None = None
self._request_info_enabled: bool = False
self._request_info_filter: set[str] | None = None
def register_participants(
self,
participant_factories: Sequence[Callable[[], AgentProtocol | Executor]],
) -> "SequentialBuilder":
"""Register participant factories for this sequential workflow."""
if self._participants:
raise ValueError(
"Cannot mix .participants([...]) and .register_participants() in the same builder instance."
)
if not participant_factories:
raise ValueError("participant_factories cannot be empty")
self._participant_factories = list(participant_factories)
return self
def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "SequentialBuilder":
"""Define the ordered participants for this sequential workflow.
Accepts AgentProtocol instances (auto-wrapped as AgentExecutor) or Executor instances.
Raises if empty or duplicates are provided for clarity.
"""
if self._participant_factories:
raise ValueError(
"Cannot mix .participants([...]) and .register_participants() in the same builder instance."
)
if not participants:
raise ValueError("participants cannot be empty")
@@ -217,13 +244,22 @@ class SequentialBuilder:
- _InputToConversation normalizes the initial input into list[ChatMessage]
- For each participant in order:
- If Agent (or AgentExecutor): pass conversation to the agent, then optionally
route through human input interceptor, then convert response to conversation
route through a request info interceptor, then convert response to conversation
via _ResponseToConversation
- Else (custom Executor): pass conversation directly to the executor
- _EndWithConversation yields the final conversation and the workflow becomes idle
"""
if not self._participants:
raise ValueError("No participants provided. Call .participants([...]) first.")
if not self._participants and not self._participant_factories:
raise ValueError(
"No participants or participant factories provided to the builder. "
"Use .participants([...]) or .ss([...])."
)
if self._participants and self._participant_factories:
# Defensive strategy: this should never happen due to checks in respective methods
raise ValueError(
"Cannot mix .participants([...]) and .register_participants() in the same builder instance."
)
# Internal nodes
input_conv = _InputToConversation(id="input-conversation")
@@ -235,13 +271,17 @@ class SequentialBuilder:
# Start of the chain is the input normalizer
prior: Executor | AgentProtocol = input_conv
for p in self._participants:
# Agent-like branch: either explicitly an AgentExecutor or any non-AgentExecutor
if not (isinstance(p, Executor) and not isinstance(p, AgentExecutor)):
# input conversation -> [human_input_interceptor] -> (agent) -> response -> conversation
label: str
label = p.id if isinstance(p, Executor) else getattr(p, "name", None) or p.__class__.__name__
resp_to_conv = _ResponseToConversation(id=f"to-conversation:{label}")
participants: list[Executor | AgentProtocol] = []
if self._participant_factories:
for factory in self._participant_factories:
p = factory()
participants.append(p)
else:
participants = self._participants
for p in participants:
if isinstance(p, (AgentProtocol, AgentExecutor)):
label = p.id if isinstance(p, AgentExecutor) else p.display_name
if self._request_info_enabled:
# Insert request info interceptor BEFORE the agent
@@ -254,13 +294,15 @@ class SequentialBuilder:
else:
builder.add_edge(prior, p)
resp_to_conv = _ResponseToConversation(id=f"to-conversation:{label}")
builder.add_edge(p, resp_to_conv)
prior = resp_to_conv
elif isinstance(p, Executor):
# Custom executor operates on list[ChatMessage]
# If the executor doesn't handle list[ChatMessage] correctly, validation will fail
builder.add_edge(prior, p)
prior = p
else: # pragma: no cover - defensive
else:
raise TypeError(f"Unsupported participant type: {type(p).__name__}")
# Terminate with the final conversation
@@ -1,7 +1,6 @@
# Copyright (c) Microsoft. All rights reserved.
import logging
from dataclasses import fields, is_dataclass
from types import UnionType
from typing import Any, TypeVar, Union, cast, get_args, get_origin
@@ -10,67 +9,6 @@ logger = logging.getLogger(__name__)
T = TypeVar("T")
def _coerce_to_type(value: Any, target_type: type[T]) -> T | None:
"""Best-effort conversion of value into target_type.
Args:
value: The value to convert (can be dict, dataclass, or object with __dict__)
target_type: The target type to convert to
Returns:
Instance of target_type if conversion succeeds, None otherwise
"""
if isinstance(value, target_type):
return value # type: ignore[return-value]
# Convert dataclass instances or objects with __dict__ into dict first
value_as_dict: dict[str, Any]
if not isinstance(value, dict):
if is_dataclass(value):
value_as_dict = {f.name: getattr(value, f.name) for f in fields(value)}
else:
value_dict = getattr(value, "__dict__", None)
if isinstance(value_dict, dict):
value_as_dict = cast(dict[str, Any], value_dict)
else:
return None
else:
value_as_dict = cast(dict[str, Any], value)
# Try to construct the target type from the dict
ctor_kwargs: dict[str, Any] = dict(value_as_dict)
if is_dataclass(target_type):
field_names = {f.name for f in fields(target_type)}
ctor_kwargs = {k: v for k, v in value_as_dict.items() if k in field_names}
try:
return target_type(**ctor_kwargs) # type: ignore[call-arg,return-value]
except TypeError as exc:
logger.debug(f"_coerce_to_type could not call {target_type.__name__}(**..): {exc}")
except Exception as exc: # pragma: no cover - unexpected constructor failure
logger.warning(
f"_coerce_to_type encountered unexpected error calling {target_type.__name__} constructor: {exc}"
)
# Fallback: try to create instance without __init__ and set attributes
try:
instance = object.__new__(target_type)
except Exception as exc: # pragma: no cover - pathological type
logger.debug(f"_coerce_to_type could not allocate {target_type.__name__} without __init__: {exc}")
return None
for key, val in value_as_dict.items():
try:
setattr(instance, key, val)
except Exception as exc:
logger.debug(
f"_coerce_to_type could not set {target_type.__name__}.{key} during fallback assignment: {exc}"
)
continue
return instance # type: ignore[return-value]
def is_instance_of(data: Any, target_type: type | UnionType | Any) -> bool:
"""Check if the data is an instance of the target type.
@@ -15,6 +15,7 @@ from agent_framework import (
Role,
SequentialBuilder,
TextContent,
TypeCompatibilityError,
WorkflowContext,
WorkflowOutputEvent,
WorkflowRunState,
@@ -58,11 +59,43 @@ class _SummarizerExec(Executor):
await ctx.send_message(list(conversation) + [summary])
class _InvalidExecutor(Executor):
"""Invalid executor that does not have a handler that accepts a list of chat messages"""
@handler
async def summarize(self, conversation: list[str], ctx: WorkflowContext[list[ChatMessage]]) -> None:
pass
def test_sequential_builder_rejects_empty_participants() -> None:
with pytest.raises(ValueError):
SequentialBuilder().participants([])
def test_sequential_builder_rejects_empty_participant_factories() -> None:
with pytest.raises(ValueError):
SequentialBuilder().register_participants([])
def test_sequential_builder_rejects_mixing_participants_and_factories() -> None:
"""Test that mixing .participants() and .register_participants() raises an error."""
a1 = _EchoAgent(id="agent1", name="A1")
# Try .participants() then .register_participants()
with pytest.raises(ValueError, match="Cannot mix"):
SequentialBuilder().participants([a1]).register_participants([lambda: _EchoAgent(id="agent2", name="A2")])
# Try .register_participants() then .participants()
with pytest.raises(ValueError, match="Cannot mix"):
SequentialBuilder().register_participants([lambda: _EchoAgent(id="agent1", name="A1")]).participants([a1])
def test_sequential_builder_validation_rejects_invalid_executor() -> None:
"""Test that adding an invalid executor to the builder raises an error."""
with pytest.raises(TypeCompatibilityError):
SequentialBuilder().participants([_EchoAgent(id="agent1", name="A1"), _InvalidExecutor(id="invalid")]).build()
async def test_sequential_agents_append_to_context() -> None:
a1 = _EchoAgent(id="agent1", name="A1")
a2 = _EchoAgent(id="agent2", name="A2")
@@ -91,6 +124,37 @@ async def test_sequential_agents_append_to_context() -> None:
assert "A2 reply" in msgs[2].text
async def test_sequential_register_participants_with_agent_factories() -> None:
"""Test that register_participants works with agent factories."""
def create_agent1() -> _EchoAgent:
return _EchoAgent(id="agent1", name="A1")
def create_agent2() -> _EchoAgent:
return _EchoAgent(id="agent2", name="A2")
wf = SequentialBuilder().register_participants([create_agent1, create_agent2]).build()
completed = False
output: list[ChatMessage] | None = None
async for ev in wf.run_stream("hello factories"):
if isinstance(ev, WorkflowStatusEvent) and ev.state == WorkflowRunState.IDLE:
completed = True
elif isinstance(ev, WorkflowOutputEvent):
output = ev.data
if completed and output is not None:
break
assert completed
assert output is not None
assert isinstance(output, list)
msgs: list[ChatMessage] = output
assert len(msgs) == 3
assert msgs[0].role == Role.USER and "hello factories" in msgs[0].text
assert msgs[1].role == Role.ASSISTANT and "A1 reply" in msgs[1].text
assert msgs[2].role == Role.ASSISTANT and "A2 reply" in msgs[2].text
async def test_sequential_with_custom_executor_summary() -> None:
a1 = _EchoAgent(id="agent1", name="A1")
summarizer = _SummarizerExec(id="summarizer")
@@ -103,7 +167,7 @@ async def test_sequential_with_custom_executor_summary() -> None:
if isinstance(ev, WorkflowStatusEvent) and ev.state == WorkflowRunState.IDLE:
completed = True
elif isinstance(ev, WorkflowOutputEvent):
output = ev.data # type: ignore[assignment]
output = ev.data
if completed and output is not None:
break
@@ -117,6 +181,37 @@ async def test_sequential_with_custom_executor_summary() -> None:
assert msgs[2].role == Role.ASSISTANT and msgs[2].text.startswith("Summary of users:")
async def test_sequential_register_participants_mixed_agents_and_executors() -> None:
"""Test register_participants with both agent and executor factories."""
def create_agent() -> _EchoAgent:
return _EchoAgent(id="agent1", name="A1")
def create_summarizer() -> _SummarizerExec:
return _SummarizerExec(id="summarizer")
wf = SequentialBuilder().register_participants([create_agent, create_summarizer]).build()
completed = False
output: list[ChatMessage] | None = None
async for ev in wf.run_stream("topic Y"):
if isinstance(ev, WorkflowStatusEvent) and ev.state == WorkflowRunState.IDLE:
completed = True
elif isinstance(ev, WorkflowOutputEvent):
output = ev.data
if completed and output is not None:
break
assert completed
assert output is not None
msgs: list[ChatMessage] = output
# Expect: [user, A1 reply, summary]
assert len(msgs) == 3
assert msgs[0].role == Role.USER and "topic Y" in msgs[0].text
assert msgs[1].role == Role.ASSISTANT and "A1 reply" in msgs[1].text
assert msgs[2].role == Role.ASSISTANT and msgs[2].text.startswith("Summary of users:")
async def test_sequential_checkpoint_resume_round_trip() -> None:
storage = InMemoryCheckpointStorage()
@@ -229,3 +324,130 @@ async def test_sequential_checkpoint_runtime_overrides_buildtime() -> None:
assert len(runtime_checkpoints) > 0, "Runtime storage should have checkpoints"
assert len(buildtime_checkpoints) == 0, "Build-time storage should have no checkpoints when overridden"
async def test_sequential_register_participants_with_checkpointing() -> None:
"""Test that checkpointing works with register_participants."""
storage = InMemoryCheckpointStorage()
def create_agent1() -> _EchoAgent:
return _EchoAgent(id="agent1", name="A1")
def create_agent2() -> _EchoAgent:
return _EchoAgent(id="agent2", name="A2")
wf = SequentialBuilder().register_participants([create_agent1, create_agent2]).with_checkpointing(storage).build()
baseline_output: list[ChatMessage] | None = None
async for ev in wf.run_stream("checkpoint with factories"):
if isinstance(ev, WorkflowOutputEvent):
baseline_output = ev.data
if isinstance(ev, WorkflowStatusEvent) and ev.state == WorkflowRunState.IDLE:
break
assert baseline_output is not None
checkpoints = await storage.list_checkpoints()
assert checkpoints
checkpoints.sort(key=lambda cp: cp.timestamp)
resume_checkpoint = next(
(cp for cp in checkpoints if (cp.metadata or {}).get("checkpoint_type") == "superstep"),
checkpoints[-1],
)
wf_resume = (
SequentialBuilder().register_participants([create_agent1, create_agent2]).with_checkpointing(storage).build()
)
resumed_output: list[ChatMessage] | None = None
async for ev in wf_resume.run_stream(checkpoint_id=resume_checkpoint.checkpoint_id):
if isinstance(ev, WorkflowOutputEvent):
resumed_output = ev.data
if isinstance(ev, WorkflowStatusEvent) and ev.state in (
WorkflowRunState.IDLE,
WorkflowRunState.IDLE_WITH_PENDING_REQUESTS,
):
break
assert resumed_output is not None
assert [m.role for m in resumed_output] == [m.role for m in baseline_output]
assert [m.text for m in resumed_output] == [m.text for m in baseline_output]
async def test_sequential_register_participants_factories_called_on_build() -> None:
"""Test that factories are called during build(), not during register_participants()."""
call_count = 0
def create_agent() -> _EchoAgent:
nonlocal call_count
call_count += 1
return _EchoAgent(id=f"agent{call_count}", name=f"A{call_count}")
builder = SequentialBuilder().register_participants([create_agent, create_agent])
# Factories should not be called yet
assert call_count == 0
wf = builder.build()
# Now factories should have been called
assert call_count == 2
# Run the workflow to ensure it works
completed = False
output: list[ChatMessage] | None = None
async for ev in wf.run_stream("test factories timing"):
if isinstance(ev, WorkflowStatusEvent) and ev.state == WorkflowRunState.IDLE:
completed = True
elif isinstance(ev, WorkflowOutputEvent):
output = ev.data # type: ignore[assignment]
if completed and output is not None:
break
assert completed
assert output is not None
msgs: list[ChatMessage] = output
# Should have user message + 2 agent replies
assert len(msgs) == 3
async def test_sequential_builder_reusable_after_build_with_participants() -> None:
"""Test that the builder can be reused to build multiple identical workflows with participants()."""
a1 = _EchoAgent(id="agent1", name="A1")
a2 = _EchoAgent(id="agent2", name="A2")
builder = SequentialBuilder().participants([a1, a2])
# Build first workflow
builder.build()
assert builder._participants[0] is a1 # type: ignore
assert builder._participants[1] is a2 # type: ignore
assert builder._participant_factories == [] # type: ignore
async def test_sequential_builder_reusable_after_build_with_factories() -> None:
"""Test that the builder can be reused to build multiple workflows with register_participants()."""
call_count = 0
def create_agent1() -> _EchoAgent:
nonlocal call_count
call_count += 1
return _EchoAgent(id="agent1", name="A1")
def create_agent2() -> _EchoAgent:
nonlocal call_count
call_count += 1
return _EchoAgent(id="agent2", name="A2")
builder = SequentialBuilder().register_participants([create_agent1, create_agent2])
# Build first workflow - factories should be called
builder.build()
assert call_count == 2
assert builder._participants == [] # type: ignore
assert len(builder._participant_factories) == 2 # type: ignore
assert builder._participant_factories[0] is create_agent1 # type: ignore
assert builder._participant_factories[1] is create_agent2 # type: ignore
@@ -124,6 +124,7 @@ For additional observability samples in Agent Framework, see the [observability
| Magentic + Checkpoint Resume | [orchestration/magentic_checkpoint.py](./orchestration/magentic_checkpoint.py) | Resume Magentic orchestration from saved checkpoints |
| Sequential Orchestration (Agents) | [orchestration/sequential_agents.py](./orchestration/sequential_agents.py) | Chain agents sequentially with shared conversation context |
| Sequential Orchestration (Custom Executor) | [orchestration/sequential_custom_executors.py](./orchestration/sequential_custom_executors.py) | Mix agents with a summarizer that appends a compact summary |
| Sequential Orchestration (Participant Factories) | [orchestration/sequential_participant_factory.py](./orchestration/sequential_participant_factory.py) | Use participant factories for state isolation between workflow instances |
**Magentic checkpointing tip**: Treat `MagenticBuilder.participants` keys as stable identifiers. When resuming from a checkpoint, the rebuilt workflow must reuse the same participant names; otherwise the checkpoint cannot be applied and the run will fail fast.
@@ -13,7 +13,6 @@ from agent_framework import (
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
from typing_extensions import Never
"""
Sample: Sequential workflow mixing agents and a custom summarizer executor
@@ -42,12 +41,12 @@ class Summarizer(Executor):
"""Simple summarizer: consumes full conversation and appends an assistant summary."""
@handler
async def summarize(self, conversation: list[ChatMessage], ctx: WorkflowContext[Never, list[ChatMessage]]) -> None:
async def summarize(self, conversation: list[ChatMessage], ctx: WorkflowContext[list[ChatMessage]]) -> None:
users = sum(1 for m in conversation if m.role == Role.USER)
assistants = sum(1 for m in conversation if m.role == Role.ASSISTANT)
summary = ChatMessage(role=Role.ASSISTANT, text=f"Summary -> users:{users} assistants:{assistants}")
final_conversation = list(conversation) + [summary]
await ctx.yield_output(final_conversation)
await ctx.send_message(final_conversation)
async def main() -> None:
@@ -0,0 +1,127 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from agent_framework import (
ChatAgent,
ChatMessage,
Executor,
Role,
SequentialBuilder,
Workflow,
WorkflowContext,
handler,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
"""
Sample: Sequential workflow with participant factories
This sample demonstrates how to create a sequential workflow with participant factories.
Using participant factories allows you to set up proper state isolation between workflow
instances created by the same builder. This is particularly useful when you need to handle
requests or tasks in parallel with stateful participants.
In this example, we create a sequential workflow with two participants: an accumulator
and a content producer. The accumulator is stateful and maintains a list of all messages it has
received. Context is maintained across runs of the same workflow instance but not across different
workflow instances.
"""
class Accumulate(Executor):
"""Simple accumulator.
Accumulates all messages from the conversation and prints them out.
"""
def __init__(self, id: str):
super().__init__(id)
# Some internal state to accumulate messages
self._accumulated: list[str] = []
@handler
async def accumulate(self, conversation: list[ChatMessage], ctx: WorkflowContext[list[ChatMessage]]) -> None:
self._accumulated.extend([msg.text for msg in conversation])
print(f"Number of queries received so far: {len(self._accumulated)}")
await ctx.send_message(conversation)
def create_agent() -> ChatAgent:
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
instructions="Produce a concise paragraph answering the user's request.",
name="ContentProducer",
)
async def run_workflow(workflow: Workflow, query: str) -> None:
events = await workflow.run(query)
outputs = events.get_outputs()
if outputs:
messages: list[ChatMessage] = outputs[0]
for message in messages:
name = message.author_name or ("assistant" if message.role == Role.ASSISTANT else "user")
print(f"{name}: {message.text}")
else:
raise RuntimeError("No outputs received from the workflow.")
async def main() -> None:
# 1) Create a builder with participant factories
builder = SequentialBuilder().register_participants([
lambda: Accumulate("accumulator"),
create_agent,
])
# 2) Build workflow_a
workflow_a = builder.build()
# 3) Run workflow_a
# Context is maintained across runs
print("=== First Run on workflow_a ===")
await run_workflow(workflow_a, "Why is the sky blue?")
print("\n=== Second Run on workflow_a ===")
await run_workflow(workflow_a, "Repeat my previous question.")
# 4) Build workflow_b
# This will create a new instance of the accumulator and content producer
# using the same workflow builder
workflow_b = builder.build()
# 5) Run workflow_b
# Context is not maintained across instances
print("\n=== First Run on workflow_b ===")
await run_workflow(workflow_b, "Repeat my previous question.")
"""
Sample Output:
=== First Run on workflow_a ===
Number of queries received so far: 1
user: Why is the sky blue?
ContentProducer: The sky appears blue due to a phenomenon called Rayleigh scattering.
When sunlight enters the Earth's atmosphere, it collides with gases
and particles, scattering shorter wavelengths of light (blue and violet)
more than the longer wavelengths (red and yellow). Although violet light
is scattered even more than blue, our eyes are more sensitive to blue
light, and some violet light is absorbed by the ozone layer. As a result,
we perceive the sky as predominantly blue during the day.
=== Second Run on workflow_a ===
Number of queries received so far: 2
user: Repeat my previous question.
ContentProducer: Why is the sky blue?
=== First Run on workflow_b ===
Number of queries received so far: 1
user: Repeat my previous question.
ContentProducer: I'm sorry, but I can't repeat your previous question as I don't have
access to your past queries. However, feel free to ask anything again,
and I'll be happy to help!
"""
if __name__ == "__main__":
asyncio.run(main())