Python: Add Sequential orchestration builder support. Samples. Tests. (#703)

* Add support for the Sequential Builder. Add samples. Add tests

* AgentExecutor: always compute full convo during response

* Upgrade azure-ai-agents ToolOutput to FunctionToolOutput

* Explicit notes around allows types for custom agent executors
This commit is contained in:
Evan Mattson
2025-09-16 10:52:34 +09:00
committed by GitHub
Unverified
parent 68b76e6726
commit 5c0b037e2c
11 changed files with 2537 additions and 1936 deletions
@@ -62,6 +62,7 @@ _IMPORTS = [
"WorkflowErrorDetails",
"WorkflowFailedEvent",
"ConcurrentBuilder",
"SequentialBuilder",
]
@@ -30,6 +30,7 @@ from agent_framework_workflow import (
RequestInfoExecutor,
RequestInfoMessage,
RequestResponse,
SequentialBuilder,
StandardMagenticManager,
SubWorkflowRequestInfo,
SubWorkflowResponse,
@@ -84,6 +85,7 @@ __all__ = [
"RequestInfoExecutor",
"RequestInfoMessage",
"RequestResponse",
"SequentialBuilder",
"StandardMagenticManager",
"SubWorkflowRequestInfo",
"SubWorkflowResponse",
@@ -72,6 +72,7 @@ from ._runner_context import (
Message,
RunnerContext,
)
from ._sequential import SequentialBuilder
from ._validation import (
EdgeDuplicationError,
GraphConnectivityError,
@@ -137,6 +138,7 @@ __all__ = [
"RequestInfoMessage",
"RequestResponse",
"RunnerContext",
"SequentialBuilder",
"StandardMagenticManager",
"SubWorkflowRequestInfo",
"SubWorkflowResponse",
@@ -869,11 +869,10 @@ class AgentExecutor(Executor):
)
await ctx.add_event(AgentRunEvent(self.id, response))
full_conversation: list[ChatMessage] | None = None
if self._cache:
# Construct conversation snapshot = inputs (cache) + agent outputs (agent_run_response.messages).
# Do not mutate response.messages so AgentRunEvent remains clean.
full_conversation = list(self._cache) + list(response.messages)
# Always construct a full conversation snapshot from inputs (cache)
# plus agent outputs (agent_run_response.messages). Do not mutate
# response.messages so AgentRunEvent remains faithful to the raw output.
full_conversation: list[ChatMessage] = list(self._cache) + list(response.messages)
agent_response = AgentExecutorResponse(self.id, response, full_conversation=full_conversation)
await ctx.send_message(agent_response)
@@ -888,41 +887,7 @@ class AgentExecutor(Executor):
"""
self._cache.extend(request.messages)
if request.should_respond:
if self._streaming:
updates: list[AgentRunResponseUpdate] = []
async for update in self._agent.run_stream(
self._cache,
thread=self._agent_thread,
):
if not update:
continue
contents = getattr(update, "contents", None)
text_val = getattr(update, "text", "")
has_text_content = False
if contents:
for c in contents:
if getattr(c, "text", None):
has_text_content = True
break
if not (text_val or has_text_content):
continue
updates.append(update)
await ctx.add_event(AgentRunUpdateEvent(self.id, update))
response = AgentRunResponse.from_agent_run_response_updates(updates)
else:
response = await self._agent.run(
self._cache,
thread=self._agent_thread,
)
await ctx.add_event(AgentRunEvent(self.id, response))
full_conversation: list[ChatMessage] | None = None
if self._cache:
full_conversation = list(self._cache) + list(response.messages)
agent_response = AgentExecutorResponse(self.id, response, full_conversation=full_conversation)
await ctx.send_message(agent_response)
self._cache.clear()
await self._run_agent_and_emit(ctx)
@handler
async def from_response(self, prior: AgentExecutorResponse, ctx: WorkflowContext[AgentExecutorResponse]) -> None:
@@ -0,0 +1,187 @@
# Copyright (c) Microsoft. All rights reserved.
"""Sequential builder for agent/executor workflows with shared conversation context.
This module provides a high-level, agent-focused API to assemble a sequential
workflow where:
- Participants are a sequence of AgentProtocol instances or Executors
- A shared conversation context (list[ChatMessage]) is passed along the chain
- Agents append their assistant messages to the context
- Custom executors can transform or summarize and return a refined context
- The workflow completes with the final context produced by the last participant
Typical wiring:
input -> _InputToConversation -> participant1 -> (agent? -> _ResponseToConversation) -> ... -> participantN -> _CompleteWithConversation
Notes:
- Participants can mix AgentProtocol and Executor objects
- Agents are auto-wrapped by WorkflowBuilder as AgentExecutor
- AgentExecutor produces AgentExecutorResponse; _ResponseToConversation converts this to list[ChatMessage]
- Non-agent executors must define a handler that consumes `list[ChatMessage]` and sends back
the updated `list[ChatMessage]` via their workflow context
Why include the small internal adapter executors?
- Input normalization ("input-conversation"): ensures the workflow always starts with a
`list[ChatMessage]` regardless of whether callers pass a `str`, a single `ChatMessage`,
or a list. This keeps the first hop strongly typed and avoids boilerplate in participants.
- Agent response adaptation ("to-conversation:<participant>"): agents (via AgentExecutor)
emit `AgentExecutorResponse`. The adapter converts that to a `list[ChatMessage]`
using `full_conversation` so original prompts aren't lost when chaining.
- Explicit completion ("complete"): emits a `WorkflowCompletedEvent` with the final
conversation list, giving a consistent terminal payload shape for both agents and
custom executors.
These adapters are first-class executors by design so they are type-checked at edges,
observable (ExecutorInvoke/Completed events), and easily testable/reusable. Their IDs are
deterministic and self-describing (for example, "to-conversation:writer") to reduce event-log
confusion and to mirror how the concurrent builder uses explicit dispatcher/aggregator nodes.
""" # noqa: E501
import logging
from collections.abc import Sequence
from typing import Any
from agent_framework import AgentProtocol, ChatMessage, Role
from ._events import WorkflowCompletedEvent
from ._executor import (
AgentExecutor,
AgentExecutorResponse,
Executor,
handler,
)
from ._workflow import Workflow, WorkflowBuilder
from ._workflow_context import WorkflowContext
logger = logging.getLogger(__name__)
class _InputToConversation(Executor):
"""Normalizes initial input into a list[ChatMessage] conversation."""
@handler
async def from_str(self, prompt: str, ctx: WorkflowContext[list[ChatMessage]]) -> None:
await ctx.send_message([ChatMessage(Role.USER, text=prompt)])
@handler
async def from_message(self, message: ChatMessage, ctx: WorkflowContext[list[ChatMessage]]) -> None: # type: ignore[name-defined]
await ctx.send_message([message])
@handler
async def from_messages(self, messages: list[ChatMessage], ctx: WorkflowContext[list[ChatMessage]]) -> None: # type: ignore[name-defined]
# Make a copy to avoid mutation downstream
await ctx.send_message(list(messages))
class _ResponseToConversation(Executor):
"""Converts AgentExecutorResponse to list[ChatMessage] conversation for chaining."""
@handler
async def convert(self, response: AgentExecutorResponse, ctx: WorkflowContext[list[ChatMessage]]) -> None:
# Always use full_conversation; AgentExecutor guarantees it is populated.
if response.full_conversation is None: # Defensive: indicates a contract violation
raise RuntimeError("AgentExecutorResponse.full_conversation missing. AgentExecutor must populate it.")
await ctx.send_message(list(response.full_conversation))
class _CompleteWithConversation(Executor):
"""Terminates the workflow by emitting the final conversation context."""
@handler
async def complete(self, conversation: list[ChatMessage], ctx: WorkflowContext[Any]) -> None:
await ctx.add_event(WorkflowCompletedEvent(data=list(conversation)))
class SequentialBuilder:
r"""High-level builder for sequential agent/executor workflows with shared context.
- `participants([...])` accepts a list of AgentProtocol (recommended) or Executor
- The workflow wires participants in order, passing a list[ChatMessage] down the chain
- Agents append their assistant messages to the conversation
- Custom executors can transform/summarize and return a list[ChatMessage]
- The final output is the conversation produced by the last participant
Usage:
```python
from agent_framework.workflow import SequentialBuilder
workflow = SequentialBuilder().participants([agent1, agent2, summarizer_exec]).build()
```
"""
def __init__(self) -> None:
self._participants: list[AgentProtocol | Executor] = []
def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "SequentialBuilder":
"""Define the ordered participants for this sequential workflow.
Accepts AgentProtocol instances (auto-wrapped as AgentExecutor) or Executor instances.
Raises if empty or duplicates are provided for clarity.
"""
if not participants:
raise ValueError("participants cannot be empty")
# Defensive duplicate detection
seen_agent_ids: set[int] = set()
seen_executor_ids: set[str] = set()
for p in participants:
if isinstance(p, Executor):
if p.id in seen_executor_ids:
raise ValueError(f"Duplicate executor participant detected: id '{p.id}'")
seen_executor_ids.add(p.id)
else:
# Treat non-Executor as agent-like (AgentProtocol). Structural checks can be brittle at runtime.
pid = id(p)
if pid in seen_agent_ids:
raise ValueError("Duplicate agent participant detected (same agent instance provided twice)")
seen_agent_ids.add(pid)
self._participants = list(participants)
return self
def build(self) -> Workflow:
"""Build and validate the sequential workflow.
Wiring pattern:
- _InputToConversation normalizes the initial input into list[ChatMessage]
- For each participant in order:
- If Agent (or AgentExecutor): pass conversation to the agent, then convert response
to conversation via _ResponseToConversation
- Else (custom Executor): pass conversation directly to the executor
- _CompleteWithConversation emits WorkflowCompletedEvent with the final conversation
"""
if not self._participants:
raise ValueError("No participants provided. Call .participants([...]) first.")
# Internal nodes
input_conv = _InputToConversation(id="input-conversation")
complete = _CompleteWithConversation(id="complete")
builder = WorkflowBuilder()
builder.set_start_executor(input_conv)
# Start of the chain is the input normalizer
prior: Executor | AgentProtocol = input_conv
for p in self._participants:
# Agent-like branch: either explicitly an AgentExecutor or any non-AgentExecutor
if not (isinstance(p, Executor) and not isinstance(p, AgentExecutor)):
# input conversation -> (agent) -> response -> conversation
builder.add_edge(prior, p)
# Give the adapter a deterministic, self-describing id
label: str
label = p.id if isinstance(p, Executor) else getattr(p, "name", None) or p.__class__.__name__
resp_to_conv = _ResponseToConversation(id=f"to-conversation:{label}")
builder.add_edge(p, resp_to_conv)
prior = resp_to_conv
elif isinstance(p, Executor):
# Custom executor operates on list[ChatMessage]
builder.add_edge(prior, p)
prior = p
else: # pragma: no cover - defensive
raise TypeError(f"Unsupported participant type: {type(p).__name__}")
# Terminate with the final conversation
builder.add_edge(prior, complete)
return builder.build()
@@ -0,0 +1,160 @@
# Copyright (c) Microsoft. All rights reserved.
from collections.abc import AsyncIterable
from typing import Any
from agent_framework import (
AgentRunResponse,
AgentRunResponseUpdate,
AgentThread,
BaseAgent,
ChatMessage,
Role,
TextContent,
)
from pydantic import PrivateAttr
from agent_framework_workflow import (
AgentExecutor,
SequentialBuilder,
WorkflowBuilder,
WorkflowCompletedEvent,
WorkflowContext,
handler,
)
from agent_framework_workflow._executor import AgentExecutorResponse, Executor
class _SimpleAgent(BaseAgent):
"""Agent that returns a single assistant message (non-streaming path)."""
def __init__(self, *, reply_text: str, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._reply_text = reply_text
async def run( # type: ignore[override]
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AgentRunResponse:
return AgentRunResponse(messages=[ChatMessage(role=Role.ASSISTANT, text=self._reply_text)])
async def run_stream( # type: ignore[override]
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentRunResponseUpdate]:
# This agent does not support streaming; yield a single complete response
yield AgentRunResponseUpdate(contents=[TextContent(text=self._reply_text)])
class _CaptureFullConversation(Executor):
"""Captures AgentExecutorResponse.full_conversation and completes the workflow."""
@handler
async def capture(self, response: AgentExecutorResponse, ctx: WorkflowContext[None]) -> None:
full = response.full_conversation
# The AgentExecutor contract guarantees full_conversation is populated.
assert full is not None
await ctx.add_event(
WorkflowCompletedEvent(
data={
"length": len(full),
"roles": [m.role for m in full],
"texts": [m.text for m in full],
}
)
)
async def test_agent_executor_populates_full_conversation_non_streaming() -> None:
# Arrange: non-streaming AgentExecutor for deterministic response composition
agent = _SimpleAgent(id="agent1", name="A", reply_text="agent-reply")
agent_exec = AgentExecutor(agent, streaming=False, id="agent1-exec")
capturer = _CaptureFullConversation(id="capture")
wf = WorkflowBuilder().set_start_executor(agent_exec).add_edge(agent_exec, capturer).build()
# Act: run with a simple user prompt
completed: WorkflowCompletedEvent | None = None
async for ev in wf.run_stream("hello world"):
if isinstance(ev, WorkflowCompletedEvent):
completed = ev
break
# Assert: full_conversation contains [user("hello world"), assistant("agent-reply")]
assert completed is not None
payload = completed.data # type: ignore[assignment]
assert isinstance(payload, dict)
assert payload["length"] == 2
assert payload["roles"][0] == Role.USER and "hello world" in (payload["texts"][0] or "")
assert payload["roles"][1] == Role.ASSISTANT and "agent-reply" in (payload["texts"][1] or "")
class _CaptureAgent(BaseAgent):
"""Streaming-capable agent that records the messages it received."""
_last_messages: list[ChatMessage] = PrivateAttr(default_factory=list) # type: ignore
def __init__(self, *, reply_text: str, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._reply_text = reply_text
async def run( # type: ignore[override]
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AgentRunResponse:
# Normalize and record messages for verification when running non-streaming
norm: list[ChatMessage] = []
if messages:
for m in messages: # type: ignore[iteration-over-optional]
if isinstance(m, ChatMessage):
norm.append(m)
elif isinstance(m, str):
norm.append(ChatMessage(role=Role.USER, text=m))
self._last_messages = norm
return AgentRunResponse(messages=[ChatMessage(role=Role.ASSISTANT, text=self._reply_text)])
async def run_stream( # type: ignore[override]
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentRunResponseUpdate]:
# Normalize and record messages for verification when running streaming
norm: list[ChatMessage] = []
if messages:
for m in messages: # type: ignore[iteration-over-optional]
if isinstance(m, ChatMessage):
norm.append(m)
elif isinstance(m, str):
norm.append(ChatMessage(role=Role.USER, text=m))
self._last_messages = norm
yield AgentRunResponseUpdate(contents=[TextContent(text=self._reply_text)])
async def test_sequential_adapter_uses_full_conversation() -> None:
# Arrange: two streaming agents; the second records what it receives
a1 = _CaptureAgent(id="agent1", name="A1", reply_text="A1 reply")
a2 = _CaptureAgent(id="agent2", name="A2", reply_text="A2 reply")
wf = SequentialBuilder().participants([a1, a2]).build()
# Act
async for ev in wf.run_stream("hello seq"):
if isinstance(ev, WorkflowCompletedEvent):
break
# Assert: second agent should have seen the user prompt and A1's assistant reply
seen = a2._last_messages # pyright: ignore[reportPrivateUsage]
assert len(seen) == 2
assert seen[0].role == Role.USER and "hello seq" in (seen[0].text or "")
assert seen[1].role == Role.ASSISTANT and "A1 reply" in (seen[1].text or "")
@@ -0,0 +1,106 @@
# Copyright (c) Microsoft. All rights reserved.
from collections.abc import AsyncIterable
from typing import Any
import pytest
from agent_framework import (
AgentRunResponse,
AgentRunResponseUpdate,
AgentThread,
BaseAgent,
ChatMessage,
Role,
TextContent,
)
from agent_framework_workflow import (
Executor,
SequentialBuilder,
WorkflowCompletedEvent,
WorkflowContext,
handler,
)
class _EchoAgent(BaseAgent):
"""Simple agent that appends a single assistant message with its name."""
async def run( # type: ignore[override]
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AgentRunResponse:
return AgentRunResponse(messages=[ChatMessage(role=Role.ASSISTANT, text=f"{self.display_name} reply")])
async def run_stream( # type: ignore[override]
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentRunResponseUpdate]:
# Minimal async generator with one assistant update
yield AgentRunResponseUpdate(contents=[TextContent(text=f"{self.display_name} reply")])
class _SummarizerExec(Executor):
"""Custom executor that summarizes by appending a short assistant message."""
@handler
async def summarize(self, conversation: list[ChatMessage], ctx: WorkflowContext[list[ChatMessage]]) -> None:
user_texts = [m.text for m in conversation if m.role == Role.USER]
agents = [m.author_name or m.role for m in conversation if m.role == Role.ASSISTANT]
summary = ChatMessage(role=Role.ASSISTANT, text=f"Summary of users:{len(user_texts)} agents:{len(agents)}")
await ctx.send_message(list(conversation) + [summary])
def test_sequential_builder_rejects_empty_participants() -> None:
with pytest.raises(ValueError):
SequentialBuilder().participants([])
async def test_sequential_agents_append_to_context() -> None:
a1 = _EchoAgent(id="agent1", name="A1")
a2 = _EchoAgent(id="agent2", name="A2")
wf = SequentialBuilder().participants([a1, a2]).build()
completed: WorkflowCompletedEvent | None = None
async for ev in wf.run_stream("hello sequential"):
if isinstance(ev, WorkflowCompletedEvent):
completed = ev
break
assert completed is not None
assert isinstance(completed.data, list)
msgs: list[ChatMessage] = completed.data # type: ignore[assignment]
assert len(msgs) == 3
assert msgs[0].role == Role.USER and "hello sequential" in msgs[0].text
assert msgs[1].role == Role.ASSISTANT and (msgs[1].author_name == "A1" or True)
assert msgs[2].role == Role.ASSISTANT and (msgs[2].author_name == "A2" or True)
assert "A1 reply" in msgs[1].text
assert "A2 reply" in msgs[2].text
async def test_sequential_with_custom_executor_summary() -> None:
a1 = _EchoAgent(id="agent1", name="A1")
summarizer = _SummarizerExec(id="summarizer")
wf = SequentialBuilder().participants([a1, summarizer]).build()
completed: WorkflowCompletedEvent | None = None
async for ev in wf.run_stream("topic X"):
if isinstance(ev, WorkflowCompletedEvent):
completed = ev
break
assert completed is not None
msgs: list[ChatMessage] = completed.data # type: ignore[assignment]
# Expect: [user, A1 reply, summary]
assert len(msgs) == 3
assert msgs[0].role == Role.USER
assert msgs[1].role == Role.ASSISTANT and "A1 reply" in msgs[1].text
assert msgs[2].role == Role.ASSISTANT and msgs[2].text.startswith("Summary of users:")
@@ -83,6 +83,8 @@ Once comfortable with these, explore the rest of the samples below.
| Concurrent Orchestration (Custom Agent Executors) | [orchestration/concurrent_custom_agent_executors.py](./orchestration/concurrent_custom_agent_executors.py) | Child executors own ChatAgents; concurrent fan-out/fan-in via ConcurrentBuilder |
| Magentic Workflow (Multi-Agent) | [orchestration/magentic.py](./orchestration/magentic.py) | Orchestrate multiple agents with Magentic manager and streaming |
| Magentic + Human Plan Review | [orchestration/magentic_human_plan_update.py](./orchestration/magentic_human_plan_update.py) | Human reviews/updates the plan before execution |
| Sequential Orchestration (Agents) | [orchestration/sequential_agents.py](./orchestration/sequential_agents.py) | Chain agents sequentially with shared conversation context |
| Sequential Orchestration (Custom Executor) | [orchestration/sequential_custom_executors.py](./orchestration/sequential_custom_executors.py) | Mix agents with a summarizer that appends a compact summary |
### parallelism
| Sample | File | Concepts |
@@ -110,6 +112,13 @@ Once comfortable with these, explore the rest of the samples below.
Notes
- Agent-based samples use provider SDKs (Azure/OpenAI, etc.). Ensure credentials are configured, or adapt agents accordingly.
Sequential orchestration uses a few small adapter nodes for plumbing:
- "input-conversation" normalizes input to `list[ChatMessage]`
- "to-conversation:<participant>" converts agent responses into the shared conversation
- "complete" publishes the final `WorkflowCompletedEvent`
These may appear in event streams (ExecutorInvoke/Completed). Theyre analogous to
concurrents dispatcher and aggregator and can be ignored if you only care about agent activity.
### Environment Variables
- **AzureChatClient**: Set Azure OpenAI environment variables as documented [here](https://github.com/microsoft/agent-framework/blob/main/python/samples/getting_started/chat_client/README.md#environment-variables).
@@ -0,0 +1,80 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from typing import Any
from agent_framework import ChatMessage, Role
from agent_framework.azure import AzureChatClient
from agent_framework.workflow import SequentialBuilder, WorkflowCompletedEvent
from azure.identity import AzureCliCredential
"""
Sample: Sequential workflow (agent-focused API) with shared conversation context
Build a high-level sequential workflow using SequentialBuilder and two domain agents.
The shared conversation (list[ChatMessage]) flows through each participant. Each agent
appends its assistant message to the context. The final WorkflowCompletedEvent includes
the final conversation list.
Note on internal adapters:
- Sequential orchestration includes small adapter nodes for input normalization
("input-conversation"), agent-response conversion ("to-conversation:<participant>"),
and completion ("complete"). These may appear as ExecutorInvoke/Completed events in
the stream—similar to how concurrent orchestration includes a dispatcher/aggregator.
You can safely ignore them when focusing on agent progress.
Prerequisites:
- Azure OpenAI access configured for AzureChatClient (use az login + env vars)
"""
async def main() -> None:
# 1) Create agents
chat_client = AzureChatClient(credential=AzureCliCredential())
writer = chat_client.create_agent(
instructions=("You are a concise copywriter. Provide a single, punchy marketing sentence based on the prompt."),
name="writer",
)
reviewer = chat_client.create_agent(
instructions=("You are a thoughtful reviewer. Give brief feedback on the previous assistant message."),
name="reviewer",
)
# 2) Build sequential workflow: writer -> reviewer
workflow = SequentialBuilder().participants([writer, reviewer]).build()
# 3) Run and print final conversation
completion: WorkflowCompletedEvent | None = None
async for event in workflow.run_stream("Write a tagline for a budget-friendly eBike."):
if isinstance(event, WorkflowCompletedEvent):
completion = event
if completion:
print("===== Final Conversation =====")
messages: list[ChatMessage] | Any = completion.data
for i, msg in enumerate(messages, start=1):
name = msg.author_name or ("assistant" if msg.role == Role.ASSISTANT else "user")
print(f"{'-' * 60}\n{i:02d} [{name}]\n{msg.text}")
"""
Sample Output:
===== Final Conversation =====
------------------------------------------------------------
01 [user]
Write a tagline for a budget-friendly eBike.
------------------------------------------------------------
02 [writer]
Ride farther, spend less—your affordable eBike adventure starts here.
------------------------------------------------------------
03 [reviewer]
This tagline clearly communicates affordability and the benefit of extended travel, making it
appealing to budget-conscious consumers. It has a friendly and motivating tone, though it could
be slightly shorter for more punch. Overall, a strong and effective suggestion!
"""
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,91 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from typing import Any
from agent_framework import ChatMessage, Role
from agent_framework.azure import AzureChatClient
from agent_framework.workflow import Executor, SequentialBuilder, WorkflowCompletedEvent, WorkflowContext, handler
from azure.identity import AzureCliCredential
"""
Sample: Sequential workflow mixing agents and a custom summarizer executor
This demonstrates how SequentialBuilder chains participants with a shared
conversation context (list[ChatMessage]). An agent produces content; a custom
executor appends a compact summary to the conversation. The final WorkflowCompletedEvent
contains the complete conversation.
Custom executor contract:
- Provide at least one @handler accepting list[ChatMessage] and a WorkflowContext[list[ChatMessage]]
- Emit the updated conversation via ctx.send_message([...])
Note on internal adapters:
- You may see adapter nodes in the event stream such as "input-conversation",
"to-conversation:<participant>", and "complete". These provide consistent typing,
conversion of agent responses into the shared conversation, and a single point
for completion—similar to concurrent's dispatcher/aggregator.
Prerequisites:
- Azure OpenAI access configured for AzureChatClient (use az login + env vars)
"""
class Summarizer(Executor):
"""Simple summarizer: consumes full conversation and appends an assistant summary."""
@handler
async def summarize(self, conversation: list[ChatMessage], ctx: WorkflowContext[list[ChatMessage]]) -> None:
users = sum(1 for m in conversation if m.role == Role.USER)
assistants = sum(1 for m in conversation if m.role == Role.ASSISTANT)
summary = ChatMessage(role=Role.ASSISTANT, text=f"Summary -> users:{users} assistants:{assistants}")
await ctx.send_message(list(conversation) + [summary])
async def main() -> None:
# 1) Create a content agent
chat_client = AzureChatClient(credential=AzureCliCredential())
content = chat_client.create_agent(
instructions="Produce a concise paragraph answering the user's request.",
name="content",
)
# 2) Build sequential workflow: content -> summarizer
summarizer = Summarizer(id="summarizer")
workflow = SequentialBuilder().participants([content, summarizer]).build()
# 3) Run and print final conversation
completion: WorkflowCompletedEvent | None = None
async for event in workflow.run_stream("Explain the benefits of budget eBikes for commuters."):
if isinstance(event, WorkflowCompletedEvent):
completion = event
if completion:
print("===== Final Conversation =====")
messages: list[ChatMessage] | Any = completion.data
for i, msg in enumerate(messages, start=1):
name = msg.author_name or ("assistant" if msg.role == Role.ASSISTANT else "user")
print(f"{'-' * 60}\n{i:02d} [{name}]\n{msg.text}")
"""
Sample Output:
------------------------------------------------------------
01 [user]
Explain the benefits of budget eBikes for commuters.
------------------------------------------------------------
02 [content]
Budget eBikes offer commuters an affordable, eco-friendly alternative to cars and public transport.
Their electric assistance reduces physical strain and allows riders to cover longer distances quickly,
minimizing travel time and fatigue. Budget models are low-cost to maintain and operate, making them accessible
for a wider range of people. Additionally, eBikes help reduce traffic congestion and carbon emissions,
supporting greener urban environments. Overall, budget eBikes provide cost-effective, efficient, and
sustainable transportation for daily commuting needs.
------------------------------------------------------------
03 [assistant]
Summary -> users:1 assistants:1
"""
if __name__ == "__main__":
asyncio.run(main())
+1894 -1896
View File
File diff suppressed because it is too large Load Diff