[BREAKING] Python: Standardize orchestration outputs as list of ChatMessage. Allow agent as group chat manager. (#2291)

* Standardize orchestration outputs as list of chatmessage. Add chat options to group chat prompt manager

* refactor group chat

* Improve group chat manager

* README Update

* Cleanup

* Add comment

* More cleanup

* Standardize termination condition for group chat

* Improvements on termination logic

* Fix tests

* Fix new line

* PR feedback

* Update ChatKit based on OpenAI type change

* Raise error if response format is not expected type

* Only one starting executor required. Add tests.

* Add magentic start executor test
This commit is contained in:
Evan Mattson
2025-11-26 16:51:04 +09:00
committed by GitHub
Unverified
parent ed53ba158b
commit 907d79ab3c
19 changed files with 1724 additions and 573 deletions
@@ -27,6 +27,7 @@ from chatkit.types import (
EndOfTurnItem,
HiddenContextItem,
ImageAttachment,
SDKHiddenContextItem,
TaskItem,
ThreadItem,
UserMessageItem,
@@ -180,8 +181,10 @@ class ThreadItemConverter:
# Subclasses can override this method to provide custom handling
return None
def hidden_context_to_input(self, item: HiddenContextItem) -> ChatMessage | list[ChatMessage] | None:
"""Convert a ChatKit HiddenContextItem to Agent Framework ChatMessage(s).
def hidden_context_to_input(
self, item: HiddenContextItem | SDKHiddenContextItem
) -> ChatMessage | list[ChatMessage] | None:
"""Convert a ChatKit HiddenContextItem or SDKHiddenContextItem to Agent Framework ChatMessage(s).
This method is called internally by `to_agent_input()`. Override this method
to customize how hidden context is converted.
@@ -522,6 +525,9 @@ class ThreadItemConverter:
case HiddenContextItem():
out = self.hidden_context_to_input(item) or []
return out if isinstance(out, list) else [out]
case SDKHiddenContextItem():
out = self.hidden_context_to_input(item) or []
return out if isinstance(out, list) else [out]
case _:
assert_never(item)
+1 -1
View File
@@ -23,7 +23,7 @@ classifiers = [
]
dependencies = [
"agent-framework-core",
"openai-chatkit>=1.1.0,<2.0.0",
"openai-chatkit>=1.4.0,<2.0.0",
]
[tool.uv]
@@ -61,6 +61,8 @@ from ._group_chat import (
GroupChatDirective,
GroupChatStateSnapshot,
ManagerDirectiveModel,
ManagerSelectionRequest,
ManagerSelectionResponse,
)
from ._handoff import HandoffBuilder, HandoffUserInputRequest
from ._magentic import (
@@ -147,6 +149,8 @@ __all__ = [
"MagenticPlanReviewReply",
"MagenticPlanReviewRequest",
"ManagerDirectiveModel",
"ManagerSelectionRequest",
"ManagerSelectionResponse",
"Message",
"OrchestrationState",
"RequestInfoEvent",
File diff suppressed because it is too large Load Diff
@@ -1424,6 +1424,7 @@ class HandoffBuilder:
prompt=self._request_prompt,
id="handoff-user-input",
)
builder = WorkflowBuilder(name=self._name, description=self._description).set_start_executor(input_node)
specialist_aliases = {alias: exec_id for alias, exec_id in self._aliases.items() if exec_id in specialists}
@@ -1440,6 +1441,7 @@ class HandoffBuilder:
wiring = _GroupChatConfig(
manager=None,
manager_participant=None,
manager_name=self._starting_agent_id,
participants=participant_specs,
max_rounds=None,
@@ -1453,14 +1455,13 @@ class HandoffBuilder:
orchestrator_factory=_handoff_orchestrator_factory,
interceptors=(),
checkpoint_storage=self._checkpoint_storage,
builder=WorkflowBuilder(name=self._name, description=self._description),
builder=builder,
return_builder=True,
)
if not isinstance(result, tuple):
raise TypeError("Expected tuple from assemble_group_chat_workflow with return_builder=True")
builder, coordinator = result
builder = builder.set_start_executor(input_node)
builder = builder.add_edge(input_node, starting_executor)
builder = builder.add_edge(coordinator, user_gateway)
builder = builder.add_edge(user_gateway, coordinator)
@@ -961,7 +961,7 @@ class MagenticOrchestratorExecutor(BaseGroupChatOrchestrator):
async def _emit_orchestrator_message(
self,
ctx: WorkflowContext[Any, ChatMessage],
ctx: WorkflowContext[Any, list[ChatMessage]],
message: ChatMessage,
kind: str,
) -> None:
@@ -1110,7 +1110,7 @@ class MagenticOrchestratorExecutor(BaseGroupChatOrchestrator):
self,
message: _MagenticStartMessage,
context: WorkflowContext[
_MagenticResponseMessage | _MagenticRequestMessage | _MagenticPlanReviewRequest, ChatMessage
_MagenticResponseMessage | _MagenticRequestMessage | _MagenticPlanReviewRequest, list[ChatMessage]
],
) -> None:
"""Handle the initial start message to begin orchestration."""
@@ -1145,7 +1145,7 @@ class MagenticOrchestratorExecutor(BaseGroupChatOrchestrator):
# Start the inner loop
ctx2 = cast(
WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, ChatMessage],
WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, list[ChatMessage]],
context,
)
await self._run_inner_loop(ctx2)
@@ -1155,7 +1155,7 @@ class MagenticOrchestratorExecutor(BaseGroupChatOrchestrator):
self,
task_text: str,
context: WorkflowContext[
_MagenticResponseMessage | _MagenticRequestMessage | _MagenticPlanReviewRequest, ChatMessage
_MagenticResponseMessage | _MagenticRequestMessage | _MagenticPlanReviewRequest, list[ChatMessage]
],
) -> None:
await self.handle_start_message(_MagenticStartMessage.from_string(task_text), context)
@@ -1165,7 +1165,7 @@ class MagenticOrchestratorExecutor(BaseGroupChatOrchestrator):
self,
task_message: ChatMessage,
context: WorkflowContext[
_MagenticResponseMessage | _MagenticRequestMessage | _MagenticPlanReviewRequest, ChatMessage
_MagenticResponseMessage | _MagenticRequestMessage | _MagenticPlanReviewRequest, list[ChatMessage]
],
) -> None:
await self.handle_start_message(_MagenticStartMessage(task_message), context)
@@ -1175,7 +1175,7 @@ class MagenticOrchestratorExecutor(BaseGroupChatOrchestrator):
self,
conversation: list[ChatMessage],
context: WorkflowContext[
_MagenticResponseMessage | _MagenticRequestMessage | _MagenticPlanReviewRequest, ChatMessage
_MagenticResponseMessage | _MagenticRequestMessage | _MagenticPlanReviewRequest, list[ChatMessage]
],
) -> None:
await self.handle_start_message(_MagenticStartMessage(conversation), context)
@@ -1184,7 +1184,7 @@ class MagenticOrchestratorExecutor(BaseGroupChatOrchestrator):
async def handle_response_message(
self,
message: _MagenticResponseMessage,
context: WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, ChatMessage],
context: WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, list[ChatMessage]],
) -> None:
"""Handle responses from agents."""
if getattr(self, "_terminated", False):
@@ -1216,7 +1216,7 @@ class MagenticOrchestratorExecutor(BaseGroupChatOrchestrator):
response: _MagenticPlanReviewReply,
context: WorkflowContext[
# may broadcast ledger next, or ask for another round of review
_MagenticResponseMessage | _MagenticRequestMessage | _MagenticPlanReviewRequest, ChatMessage
_MagenticResponseMessage | _MagenticRequestMessage | _MagenticPlanReviewRequest, list[ChatMessage]
],
) -> None:
if getattr(self, "_terminated", False):
@@ -1262,7 +1262,7 @@ class MagenticOrchestratorExecutor(BaseGroupChatOrchestrator):
# Enter the normal coordination loop
ctx2 = cast(
WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, ChatMessage],
WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, list[ChatMessage]],
context,
)
await self._run_inner_loop(ctx2)
@@ -1289,7 +1289,7 @@ class MagenticOrchestratorExecutor(BaseGroupChatOrchestrator):
self._context.chat_history.append(self._task_ledger)
# No further review requests; proceed directly into coordination
ctx2 = cast(
WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, ChatMessage],
WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, list[ChatMessage]],
context,
)
await self._run_inner_loop(ctx2)
@@ -1324,7 +1324,7 @@ class MagenticOrchestratorExecutor(BaseGroupChatOrchestrator):
async def _run_outer_loop(
self,
context: WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, ChatMessage],
context: WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, list[ChatMessage]],
) -> None:
"""Run the outer orchestration loop - planning phase."""
if self._context is None:
@@ -1347,7 +1347,7 @@ class MagenticOrchestratorExecutor(BaseGroupChatOrchestrator):
async def _run_inner_loop(
self,
context: WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, ChatMessage],
context: WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, list[ChatMessage]],
) -> None:
"""Run the inner orchestration loop. Coordination phase. Serialized with a lock."""
if self._context is None or self._task_ledger is None:
@@ -1357,7 +1357,7 @@ class MagenticOrchestratorExecutor(BaseGroupChatOrchestrator):
async def _run_inner_loop_helper(
self,
context: WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, ChatMessage],
context: WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, list[ChatMessage]],
) -> None:
"""Run inner loop with exclusive access."""
# Narrow optional context for the remainder of this method
@@ -1442,7 +1442,7 @@ class MagenticOrchestratorExecutor(BaseGroupChatOrchestrator):
async def _reset_and_replan(
self,
context: WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, ChatMessage],
context: WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, list[ChatMessage]],
) -> None:
"""Reset context and replan."""
if self._context is None:
@@ -1468,7 +1468,7 @@ class MagenticOrchestratorExecutor(BaseGroupChatOrchestrator):
async def _prepare_final_answer(
self,
context: WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, ChatMessage],
context: WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, list[ChatMessage]],
) -> None:
"""Prepare the final answer using the manager."""
if self._context is None:
@@ -1478,11 +1478,11 @@ class MagenticOrchestratorExecutor(BaseGroupChatOrchestrator):
final_answer = await self._manager.prepare_final_answer(self._context.clone(deep=True))
# Emit a completed event for the workflow
await context.yield_output(final_answer)
await context.yield_output([final_answer])
async def _check_within_limits_or_complete(
self,
context: WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, ChatMessage],
context: WorkflowContext[_MagenticResponseMessage | _MagenticRequestMessage, list[ChatMessage]],
) -> bool:
"""Check if orchestrator is within operational limits."""
if self._context is None:
@@ -1509,7 +1509,7 @@ class MagenticOrchestratorExecutor(BaseGroupChatOrchestrator):
)
# Yield the partial result and signal completion
await context.yield_output(partial_result)
await context.yield_output([partial_result])
return False
return True
@@ -2283,21 +2283,22 @@ class MagenticWorkflow:
return
# At this point, checkpoint is guaranteed to be WorkflowCheckpoint
executor_states: dict[str, Any] = checkpoint.shared_state.get(EXECUTOR_STATE_KEY, {})
executor_states = cast(dict[str, Any], checkpoint.shared_state.get(EXECUTOR_STATE_KEY, {}))
orchestrator_id = getattr(orchestrator, "id", "")
orchestrator_state = executor_states.get(orchestrator_id)
orchestrator_state = cast(Any, executor_states.get(orchestrator_id))
if orchestrator_state is None:
orchestrator_state = executor_states.get("magentic_orchestrator")
orchestrator_state = cast(Any, executor_states.get("magentic_orchestrator"))
if not isinstance(orchestrator_state, dict):
return
context_payload = orchestrator_state.get("magentic_context")
orchestrator_state_dict = cast(dict[str, Any], orchestrator_state)
context_payload = cast(Any, orchestrator_state_dict.get("magentic_context"))
if not isinstance(context_payload, dict):
return
context_dict = cast(dict[str, Any], context_payload)
restored_participants = context_dict.get("participant_descriptions")
restored_participants = cast(Any, context_dict.get("participant_descriptions"))
if not isinstance(restored_participants, dict):
return
@@ -186,6 +186,10 @@ class ParticipantRegistry:
"""Check if a participant is registered."""
return name in self._participant_entry_ids
def is_participant_registered(self, name: str) -> bool:
"""Check if a participant is registered (alias for is_registered for compatibility)."""
return self.is_registered(name)
def all_participants(self) -> set[str]:
"""Get all registered participant names."""
return set(self._participant_entry_ids.keys())
@@ -1,9 +1,10 @@
# Copyright (c) Microsoft. All rights reserved.
from collections.abc import AsyncIterable, Callable
from typing import Any
from typing import Any, cast
import pytest
from pydantic import BaseModel
from agent_framework import (
MAGENTIC_EVENT_TYPE_AGENT_DELTA,
@@ -14,6 +15,7 @@ from agent_framework import (
AgentThread,
BaseAgent,
ChatMessage,
Executor,
GroupChatBuilder,
GroupChatDirective,
GroupChatStateSnapshot,
@@ -23,21 +25,27 @@ from agent_framework import (
Role,
TextContent,
Workflow,
WorkflowContext,
WorkflowOutputEvent,
handler,
)
from agent_framework._workflows._checkpoint import InMemoryCheckpointStorage
from agent_framework._workflows._group_chat import (
GroupChatOrchestratorExecutor,
ManagerSelectionResponse,
_default_orchestrator_factory, # type: ignore
_default_participant_factory, # type: ignore
_GroupChatConfig, # type: ignore
_PromptBasedGroupChatManager, # type: ignore
_SpeakerSelectorAdapter, # type: ignore
assemble_group_chat_workflow,
)
from agent_framework._workflows._magentic import (
_MagenticProgressLedger, # type: ignore
_MagenticProgressLedgerItem, # type: ignore
_MagenticStartMessage, # type: ignore
)
from agent_framework._workflows._participant_utils import GroupChatParticipantSpec
from agent_framework._workflows._workflow_builder import WorkflowBuilder
class StubAgent(BaseAgent):
@@ -70,6 +78,73 @@ class StubAgent(BaseAgent):
return _stream()
class StubManagerAgent(BaseAgent):
def __init__(self) -> None:
super().__init__(name="manager_agent", description="Stub manager")
self._call_count = 0
async def run(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AgentRunResponse: # type: ignore[override]
if self._call_count == 0:
self._call_count += 1
payload = {"selected_participant": "agent", "finish": False, "final_message": None}
return AgentRunResponse(
messages=[
ChatMessage(
role=Role.ASSISTANT,
text='{"selected_participant": "agent", "finish": false}',
author_name=self.name,
)
],
value=payload,
)
payload = {"selected_participant": None, "finish": True, "final_message": "agent manager final"}
return AgentRunResponse(
messages=[
ChatMessage(
role=Role.ASSISTANT,
text='{"finish": true, "final_message": "agent manager final"}',
author_name=self.name,
)
],
value=payload,
)
def run_stream(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentRunResponseUpdate]: # type: ignore[override]
if self._call_count == 0:
self._call_count += 1
async def _stream_initial() -> AsyncIterable[AgentRunResponseUpdate]:
yield AgentRunResponseUpdate(
contents=[TextContent(text='{"selected_participant": "agent", "finish": false}')],
role=Role.ASSISTANT,
author_name=self.name,
)
return _stream_initial()
async def _stream_final() -> AsyncIterable[AgentRunResponseUpdate]:
yield AgentRunResponseUpdate(
contents=[TextContent(text='{"finish": true, "final_message": "agent manager final"}')],
role=Role.ASSISTANT,
author_name=self.name,
)
return _stream_final()
def make_sequence_selector() -> Callable[[GroupChatStateSnapshot], Any]:
state_counter = {"value": 0}
@@ -123,6 +198,22 @@ class StubMagenticManager(MagenticManagerBase):
return ChatMessage(role=Role.ASSISTANT, text="final", author_name="magentic_manager")
class PassthroughExecutor(Executor):
@handler
async def forward(self, message: Any, ctx: WorkflowContext[Any]) -> None:
await ctx.send_message(message)
class CountingWorkflowBuilder(WorkflowBuilder):
def __init__(self) -> None:
super().__init__()
self.start_calls = 0
def set_start_executor(self, executor: Any) -> "CountingWorkflowBuilder":
self.start_calls += 1
return cast("CountingWorkflowBuilder", super().set_start_executor(executor))
async def test_group_chat_builder_basic_flow() -> None:
selector = make_sequence_selector()
alpha = StubAgent("alpha", "ack from alpha")
@@ -130,21 +221,23 @@ async def test_group_chat_builder_basic_flow() -> None:
workflow = (
GroupChatBuilder()
.select_speakers(selector, display_name="manager", final_message="done")
.set_select_speakers_func(selector, display_name="manager", final_message="done")
.participants(alpha=alpha, beta=beta)
.build()
)
outputs: list[ChatMessage] = []
outputs: list[list[ChatMessage]] = []
async for event in workflow.run_stream("coordinate task"):
if isinstance(event, WorkflowOutputEvent):
data = event.data
if isinstance(data, ChatMessage):
outputs.append(data)
if isinstance(data, list):
outputs.append(cast(list[ChatMessage], data))
assert len(outputs) == 1
assert outputs[0].text == "done"
assert outputs[0].author_name == "manager"
assert len(outputs[0]) >= 1
# The final message should be "done" from the manager
assert outputs[0][-1].text == "done"
assert outputs[0][-1].author_name == "manager"
async def test_magentic_builder_returns_workflow_and_runs() -> None:
@@ -169,11 +262,13 @@ async def test_magentic_builder_returns_workflow_and_runs() -> None:
agent_event_count += 1
if isinstance(event, WorkflowOutputEvent):
msg = event.data
if isinstance(msg, ChatMessage):
outputs.append(msg)
if isinstance(msg, list):
outputs.append(cast(list[ChatMessage], msg))
assert outputs, "Expected a final output message"
final = outputs[-1]
conversation = outputs[-1]
assert len(conversation) >= 1
final = conversation[-1]
assert final.text == "final"
assert final.author_name == "magentic_manager"
assert orchestrator_event_count > 0, "Expected orchestrator events to be emitted"
@@ -187,7 +282,7 @@ async def test_group_chat_as_agent_accepts_conversation() -> None:
workflow = (
GroupChatBuilder()
.select_speakers(selector, display_name="manager", final_message="done")
.set_select_speakers_func(selector, display_name="manager", final_message="done")
.participants(alpha=alpha, beta=beta)
.build()
)
@@ -239,7 +334,7 @@ class TestGroupChatBuilder:
def selector(state: GroupChatStateSnapshot) -> str | None:
return None
builder = GroupChatBuilder().select_speakers(selector)
builder = GroupChatBuilder().set_select_speakers_func(selector)
with pytest.raises(ValueError, match="participants must be configured before build"):
builder.build()
@@ -250,10 +345,10 @@ class TestGroupChatBuilder:
def selector(state: GroupChatStateSnapshot) -> str | None:
return None
builder = GroupChatBuilder().select_speakers(selector)
builder = GroupChatBuilder().set_select_speakers_func(selector)
with pytest.raises(ValueError, match="already has a manager configured"):
builder.select_speakers(selector)
builder.set_select_speakers_func(selector)
def test_empty_participants_raises_error(self) -> None:
"""Test that empty participants list raises ValueError."""
@@ -261,7 +356,7 @@ class TestGroupChatBuilder:
def selector(state: GroupChatStateSnapshot) -> str | None:
return None
builder = GroupChatBuilder().select_speakers(selector)
builder = GroupChatBuilder().set_select_speakers_func(selector)
with pytest.raises(ValueError, match="participants cannot be empty"):
builder.participants([])
@@ -274,7 +369,7 @@ class TestGroupChatBuilder:
def selector(state: GroupChatStateSnapshot) -> str | None:
return None
builder = GroupChatBuilder().select_speakers(selector)
builder = GroupChatBuilder().set_select_speakers_func(selector)
with pytest.raises(ValueError, match="Duplicate participant name 'test'"):
builder.participants([agent1, agent2])
@@ -302,7 +397,7 @@ class TestGroupChatBuilder:
def selector(state: GroupChatStateSnapshot) -> str | None:
return None
builder = GroupChatBuilder().select_speakers(selector)
builder = GroupChatBuilder().set_select_speakers_func(selector)
with pytest.raises(ValueError, match="must define a non-empty 'name' attribute"):
builder.participants([agent])
@@ -314,11 +409,53 @@ class TestGroupChatBuilder:
def selector(state: GroupChatStateSnapshot) -> str | None:
return None
builder = GroupChatBuilder().select_speakers(selector)
builder = GroupChatBuilder().set_select_speakers_func(selector)
with pytest.raises(ValueError, match="participant names must be non-empty strings"):
builder.participants({"": agent})
def test_assemble_group_chat_respects_existing_start_executor(self) -> None:
"""Ensure assemble_group_chat_workflow does not override preconfigured start executor."""
async def manager(_: GroupChatStateSnapshot) -> GroupChatDirective:
return GroupChatDirective(finish=True)
builder = CountingWorkflowBuilder()
entry = PassthroughExecutor(id="entry")
builder = builder.set_start_executor(entry)
participant = PassthroughExecutor(id="participant")
participant_spec = GroupChatParticipantSpec(
name="participant",
participant=participant,
description="participant",
)
wiring = _GroupChatConfig(
manager=manager,
manager_participant=None,
manager_name="manager",
participants={"participant": participant_spec},
max_rounds=None,
termination_condition=None,
participant_aliases={},
participant_executors={"participant": participant},
)
result = assemble_group_chat_workflow(
wiring=wiring,
participant_factory=_default_participant_factory,
orchestrator_factory=_default_orchestrator_factory,
builder=builder,
return_builder=True,
)
assert isinstance(result, tuple)
assembled_builder, _ = result
assert assembled_builder is builder
assert builder.start_calls == 1
assert assembled_builder._start_executor is entry # type: ignore
class TestGroupChatOrchestrator:
"""Tests for GroupChatOrchestratorExecutor core functionality."""
@@ -336,25 +473,116 @@ class TestGroupChatOrchestrator:
workflow = (
GroupChatBuilder()
.select_speakers(selector)
.set_select_speakers_func(selector)
.participants([agent])
.with_max_rounds(2) # Limit to 2 rounds
.build()
)
outputs: list[ChatMessage] = []
outputs: list[list[ChatMessage]] = []
async for event in workflow.run_stream("test task"):
if isinstance(event, WorkflowOutputEvent):
data = event.data
if isinstance(data, ChatMessage):
outputs.append(data)
if isinstance(data, list):
outputs.append(cast(list[ChatMessage], data))
# Should have terminated due to max_rounds, expect at least one output
assert len(outputs) >= 1
# The final message should be about round limit
final_output = outputs[-1]
# The final message in the conversation should be about round limit
conversation = outputs[-1]
assert len(conversation) >= 1
final_output = conversation[-1]
assert "round limit" in final_output.text.lower()
async def test_termination_condition_halts_conversation(self) -> None:
"""Test that a custom termination condition stops the workflow."""
def selector(state: GroupChatStateSnapshot) -> str | None:
return "agent"
def termination_condition(conversation: list[ChatMessage]) -> bool:
replies = [msg for msg in conversation if msg.role == Role.ASSISTANT and msg.author_name == "agent"]
return len(replies) >= 2
agent = StubAgent("agent", "response")
workflow = (
GroupChatBuilder()
.set_select_speakers_func(selector)
.participants([agent])
.with_termination_condition(termination_condition)
.build()
)
outputs: list[list[ChatMessage]] = []
async for event in workflow.run_stream("test task"):
if isinstance(event, WorkflowOutputEvent):
data = event.data
if isinstance(data, list):
outputs.append(cast(list[ChatMessage], data))
assert outputs, "Expected termination to yield output"
conversation = outputs[-1]
agent_replies = [msg for msg in conversation if msg.author_name == "agent" and msg.role == Role.ASSISTANT]
assert len(agent_replies) == 2
final_output = conversation[-1]
assert final_output.author_name == "manager"
assert "termination condition" in final_output.text.lower()
async def test_termination_condition_uses_manager_final_message(self) -> None:
"""Test that manager-provided final message is used on termination."""
async def selector(state: GroupChatStateSnapshot) -> str | None:
return None
agent = StubAgent("agent", "response")
final_text = "manager summary on termination"
workflow = (
GroupChatBuilder()
.set_select_speakers_func(selector, final_message=final_text)
.participants([agent])
.with_termination_condition(lambda _: True)
.build()
)
outputs: list[list[ChatMessage]] = []
async for event in workflow.run_stream("test task"):
if isinstance(event, WorkflowOutputEvent):
data = event.data
if isinstance(data, list):
outputs.append(cast(list[ChatMessage], data))
assert outputs, "Expected termination to yield output"
conversation = outputs[-1]
assert conversation[-1].text == final_text
assert conversation[-1].author_name == "manager"
async def test_termination_condition_agent_manager_finalizes(self) -> None:
"""Test that agent-based manager can provide final message on termination."""
manager = StubManagerAgent()
worker = StubAgent("agent", "response")
workflow = (
GroupChatBuilder()
.set_manager(manager, display_name="Manager")
.participants([worker])
.with_termination_condition(lambda conv: any(msg.author_name == "agent" for msg in conv))
.build()
)
outputs: list[list[ChatMessage]] = []
async for event in workflow.run_stream("test task"):
if isinstance(event, WorkflowOutputEvent):
data = event.data
if isinstance(data, list):
outputs.append(cast(list[ChatMessage], data))
assert outputs, "Expected termination to yield output"
conversation = outputs[-1]
assert conversation[-1].text == "agent manager final"
assert conversation[-1].author_name == "Manager"
async def test_unknown_participant_error(self) -> None:
"""Test that _apply_directive raises error for unknown participants."""
@@ -363,7 +591,7 @@ class TestGroupChatOrchestrator:
agent = StubAgent("agent", "response")
workflow = GroupChatBuilder().select_speakers(selector).participants([agent]).build()
workflow = GroupChatBuilder().set_select_speakers_func(selector).participants([agent]).build()
with pytest.raises(ValueError, match="Manager selected unknown participant 'unknown_agent'"):
async for _ in workflow.run_stream("test task"):
@@ -379,7 +607,7 @@ class TestGroupChatOrchestrator:
agent = StubAgent("agent", "response")
# The _SpeakerSelectorAdapter will catch this and raise TypeError
workflow = GroupChatBuilder().select_speakers(bad_selector).participants([agent]).build() # type: ignore
workflow = GroupChatBuilder().set_select_speakers_func(bad_selector).participants([agent]).build() # type: ignore
# This should raise a TypeError because selector doesn't return str or None
with pytest.raises(TypeError, match="must return a participant name \\(str\\) or None"):
@@ -394,7 +622,7 @@ class TestGroupChatOrchestrator:
agent = StubAgent("agent", "response")
workflow = GroupChatBuilder().select_speakers(selector).participants([agent]).build()
workflow = GroupChatBuilder().set_select_speakers_func(selector).participants([agent]).build()
with pytest.raises(ValueError, match="requires at least one chat message"):
async for _ in workflow.run_stream([]):
@@ -529,69 +757,76 @@ class TestCheckpointing:
storage = InMemoryCheckpointStorage()
workflow = (
GroupChatBuilder().select_speakers(selector).participants([agent]).with_checkpointing(storage).build()
GroupChatBuilder()
.set_select_speakers_func(selector)
.participants([agent])
.with_checkpointing(storage)
.build()
)
outputs: list[ChatMessage] = []
outputs: list[list[ChatMessage]] = []
async for event in workflow.run_stream("test task"):
if isinstance(event, WorkflowOutputEvent):
data = event.data
if isinstance(data, ChatMessage):
outputs.append(data)
if isinstance(data, list):
outputs.append(cast(list[ChatMessage], data))
assert len(outputs) == 1 # Should complete normally
class TestPromptBasedManager:
"""Tests for _PromptBasedGroupChatManager."""
class TestAgentManagerConfiguration:
"""Tests for agent-based manager configuration."""
async def test_manager_with_missing_next_agent_raises_error(self) -> None:
"""Test that manager directive without next_agent raises RuntimeError."""
async def test_set_manager_configures_response_format(self) -> None:
"""Ensure ChatAgent managers receive default ManagerSelectionResponse formatting."""
from unittest.mock import MagicMock
class MockChatClient:
async def get_response(self, messages: Any, response_format: Any = None) -> Any:
# Return response that has finish=False but no next_agent
class MockResponse:
def __init__(self) -> None:
self.value = {"finish": False, "next_agent": None}
self.messages: list[Any] = []
from agent_framework import ChatAgent
return MockResponse()
chat_client = MagicMock()
manager_agent = ChatAgent(chat_client=chat_client, name="Coordinator")
assert manager_agent.chat_options.response_format is None
manager = _PromptBasedGroupChatManager(MockChatClient()) # type: ignore
worker = StubAgent("worker", "response")
state = {
"participants": {"agent": "desc"},
"task": ChatMessage(role=Role.USER, text="test"),
"conversation": (),
}
builder = GroupChatBuilder().set_manager(manager_agent).participants([worker])
with pytest.raises(RuntimeError, match="missing next_agent while finish is False"):
await manager(state)
assert manager_agent.chat_options.response_format is ManagerSelectionResponse
assert builder._manager_participant is manager_agent # type: ignore[attr-defined]
async def test_manager_with_unknown_participant_raises_error(self) -> None:
"""Test that manager selecting unknown participant raises RuntimeError."""
async def test_set_manager_accepts_agent_manager(self) -> None:
"""Verify agent-based manager can be set and workflow builds."""
from unittest.mock import MagicMock
class MockChatClient:
async def get_response(self, messages: Any, response_format: Any = None) -> Any:
# Return response selecting unknown participant
class MockResponse:
def __init__(self) -> None:
self.value = {"finish": False, "next_agent": "unknown"}
self.messages: list[Any] = []
from agent_framework import ChatAgent
return MockResponse()
chat_client = MagicMock()
manager_agent = ChatAgent(chat_client=chat_client, name="Coordinator")
worker = StubAgent("worker", "response")
manager = _PromptBasedGroupChatManager(MockChatClient()) # type: ignore
builder = GroupChatBuilder().set_manager(manager_agent, display_name="Orchestrator")
builder = builder.participants([worker]).with_max_rounds(1)
state = {
"participants": {"agent": "desc"},
"task": ChatMessage(role=Role.USER, text="test"),
"conversation": (),
}
assert builder._manager_participant is manager_agent # type: ignore[attr-defined]
assert "worker" in builder._participants # type: ignore[attr-defined]
with pytest.raises(RuntimeError, match="Manager selected unknown participant 'unknown'"):
await manager(state)
async def test_set_manager_rejects_custom_response_format(self) -> None:
"""Reject custom response_format on ChatAgent managers."""
from unittest.mock import MagicMock
from agent_framework import ChatAgent
class CustomResponse(BaseModel):
value: str
chat_client = MagicMock()
manager_agent = ChatAgent(chat_client=chat_client, name="Coordinator", response_format=CustomResponse)
worker = StubAgent("worker", "response")
with pytest.raises(ValueError, match="response_format must be ManagerSelectionResponse"):
GroupChatBuilder().set_manager(manager_agent).participants([worker])
assert manager_agent.chat_options.response_format is CustomResponse
class TestFactoryFunctions:
@@ -599,9 +834,9 @@ class TestFactoryFunctions:
def test_default_orchestrator_factory_without_manager_raises_error(self) -> None:
"""Test that default factory requires manager to be set."""
config = _GroupChatConfig(manager=None, manager_name="test", participants={})
config = _GroupChatConfig(manager=None, manager_participant=None, manager_name="test", participants={})
with pytest.raises(RuntimeError, match="requires a manager to be set"):
with pytest.raises(RuntimeError, match="requires a manager to be configured"):
_default_orchestrator_factory(config)
@@ -619,14 +854,14 @@ class TestConversationHandling:
agent = StubAgent("agent", "response")
workflow = GroupChatBuilder().select_speakers(selector).participants([agent]).build()
workflow = GroupChatBuilder().set_select_speakers_func(selector).participants([agent]).build()
outputs: list[ChatMessage] = []
outputs: list[list[ChatMessage]] = []
async for event in workflow.run_stream("test string"):
if isinstance(event, WorkflowOutputEvent):
data = event.data
if isinstance(data, ChatMessage):
outputs.append(data)
if isinstance(data, list):
outputs.append(cast(list[ChatMessage], data))
assert len(outputs) == 1
@@ -641,14 +876,14 @@ class TestConversationHandling:
agent = StubAgent("agent", "response")
workflow = GroupChatBuilder().select_speakers(selector).participants([agent]).build()
workflow = GroupChatBuilder().set_select_speakers_func(selector).participants([agent]).build()
outputs: list[ChatMessage] = []
outputs: list[list[ChatMessage]] = []
async for event in workflow.run_stream(task_message):
if isinstance(event, WorkflowOutputEvent):
data = event.data
if isinstance(data, ChatMessage):
outputs.append(data)
if isinstance(data, list):
outputs.append(cast(list[ChatMessage], data))
assert len(outputs) == 1
@@ -667,14 +902,14 @@ class TestConversationHandling:
agent = StubAgent("agent", "response")
workflow = GroupChatBuilder().select_speakers(selector).participants([agent]).build()
workflow = GroupChatBuilder().set_select_speakers_func(selector).participants([agent]).build()
outputs: list[ChatMessage] = []
outputs: list[list[ChatMessage]] = []
async for event in workflow.run_stream(conversation):
if isinstance(event, WorkflowOutputEvent):
data = event.data
if isinstance(data, ChatMessage):
outputs.append(data)
if isinstance(data, list):
outputs.append(cast(list[ChatMessage], data))
assert len(outputs) == 1
@@ -695,23 +930,25 @@ class TestRoundLimitEnforcement:
workflow = (
GroupChatBuilder()
.select_speakers(selector)
.set_select_speakers_func(selector)
.participants([agent])
.with_max_rounds(1) # Very low limit
.build()
)
outputs: list[ChatMessage] = []
outputs: list[list[ChatMessage]] = []
async for event in workflow.run_stream("test"):
if isinstance(event, WorkflowOutputEvent):
data = event.data
if isinstance(data, ChatMessage):
outputs.append(data)
if isinstance(data, list):
outputs.append(cast(list[ChatMessage], data))
# Should have at least one output (the round limit message)
assert len(outputs) >= 1
# The last message should be about round limit
final_output = outputs[-1]
# The last message in the conversation should be about round limit
conversation = outputs[-1]
assert len(conversation) >= 1
final_output = conversation[-1]
assert "round limit" in final_output.text.lower()
async def test_round_limit_in_ingest_participant_message(self) -> None:
@@ -728,23 +965,25 @@ class TestRoundLimitEnforcement:
workflow = (
GroupChatBuilder()
.select_speakers(selector)
.set_select_speakers_func(selector)
.participants([agent])
.with_max_rounds(1) # Hit limit after first response
.build()
)
outputs: list[ChatMessage] = []
outputs: list[list[ChatMessage]] = []
async for event in workflow.run_stream("test"):
if isinstance(event, WorkflowOutputEvent):
data = event.data
if isinstance(data, ChatMessage):
outputs.append(data)
if isinstance(data, list):
outputs.append(cast(list[ChatMessage], data))
# Should have at least one output (the round limit message)
assert len(outputs) >= 1
# The last message should be about round limit
final_output = outputs[-1]
# The last message in the conversation should be about round limit
conversation = outputs[-1]
assert len(conversation) >= 1
final_output = conversation[-1]
assert "round limit" in final_output.text.lower()
@@ -758,12 +997,12 @@ async def test_group_chat_checkpoint_runtime_only() -> None:
agent_b = StubAgent("agentB", "Reply from B")
selector = make_sequence_selector()
wf = GroupChatBuilder().participants([agent_a, agent_b]).select_speakers(selector).build()
wf = GroupChatBuilder().participants([agent_a, agent_b]).set_select_speakers_func(selector).build()
baseline_output: list[ChatMessage] | None = None
async for ev in wf.run_stream("runtime checkpoint test", checkpoint_storage=storage):
if isinstance(ev, WorkflowOutputEvent):
baseline_output = ev.data # type: ignore[assignment]
baseline_output = cast(list[ChatMessage], ev.data) if isinstance(ev.data, list) else None
if isinstance(ev, WorkflowStatusEvent) and ev.state in (
WorkflowRunState.IDLE,
WorkflowRunState.IDLE_WITH_PENDING_REQUESTS,
@@ -794,7 +1033,7 @@ async def test_group_chat_checkpoint_runtime_overrides_buildtime() -> None:
wf = (
GroupChatBuilder()
.participants([agent_a, agent_b])
.select_speakers(selector)
.set_select_speakers_func(selector)
.with_checkpointing(buildtime_storage)
.build()
)
@@ -802,7 +1041,7 @@ async def test_group_chat_checkpoint_runtime_overrides_buildtime() -> None:
baseline_output: list[ChatMessage] | None = None
async for ev in wf.run_stream("override test", checkpoint_storage=runtime_storage):
if isinstance(ev, WorkflowOutputEvent):
baseline_output = ev.data # type: ignore[assignment]
baseline_output = cast(list[ChatMessage], ev.data) if isinstance(ev.data, list) else None
if isinstance(ev, WorkflowStatusEvent) and ev.state in (
WorkflowRunState.IDLE,
WorkflowRunState.IDLE_WITH_PENDING_REQUESTS,
@@ -816,3 +1055,30 @@ async def test_group_chat_checkpoint_runtime_overrides_buildtime() -> None:
assert len(runtime_checkpoints) > 0, "Runtime storage should have checkpoints"
assert len(buildtime_checkpoints) == 0, "Build-time storage should have no checkpoints when overridden"
class _StubExecutor(Executor):
"""Minimal executor used to satisfy workflow wiring in tests."""
def __init__(self, id: str) -> None:
super().__init__(id=id)
@handler
async def handle(self, message: object, ctx: WorkflowContext[ChatMessage]) -> None:
await ctx.yield_output(message)
def test_set_manager_builds_with_agent_manager() -> None:
"""GroupChatBuilder should build when using an agent-based manager."""
manager = _StubExecutor("manager_executor")
participant = _StubExecutor("participant_executor")
workflow = (
GroupChatBuilder().set_manager(manager, display_name="Moderator").participants({"worker": participant}).build()
)
orchestrator = workflow.get_start_executor()
assert isinstance(orchestrator, GroupChatOrchestratorExecutor)
assert orchestrator._is_manager_agent()
@@ -23,7 +23,22 @@ from agent_framework import (
WorkflowOutputEvent,
)
from agent_framework._mcp import MCPTool
from agent_framework._workflows import _handoff as handoff_module # type: ignore
from agent_framework._workflows._handoff import _clone_chat_agent # type: ignore[reportPrivateUsage]
from agent_framework._workflows._workflow_builder import WorkflowBuilder
class _CountingWorkflowBuilder(WorkflowBuilder):
created: list["_CountingWorkflowBuilder"] = []
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.start_calls = 0
_CountingWorkflowBuilder.created.append(self)
def set_start_executor(self, executor: Any) -> "_CountingWorkflowBuilder": # type: ignore[override]
self.start_calls += 1
return cast("_CountingWorkflowBuilder", super().set_start_executor(executor))
@dataclass
@@ -478,6 +493,27 @@ async def test_return_to_previous_enabled():
assert len(specialist_a.calls) == 2, "Specialist A should handle follow-up with return_to_previous enabled"
def test_handoff_builder_sets_start_executor_once(monkeypatch: pytest.MonkeyPatch) -> None:
"""Ensure HandoffBuilder.build sets the start executor only once when assembling the workflow."""
_CountingWorkflowBuilder.created.clear()
monkeypatch.setattr(handoff_module, "WorkflowBuilder", _CountingWorkflowBuilder)
coordinator = _RecordingAgent(name="coordinator")
specialist = _RecordingAgent(name="specialist")
workflow = (
HandoffBuilder(participants=[coordinator, specialist])
.set_coordinator("coordinator")
.with_termination_condition(lambda conv: len(conv) > 0)
.build()
)
assert workflow is not None
assert _CountingWorkflowBuilder.created, "Expected CountingWorkflowBuilder to be instantiated"
builder = _CountingWorkflowBuilder.created[-1]
assert builder.start_calls == 1, "set_start_executor should be invoked exactly once"
async def test_tool_choice_preserved_from_agent_config():
"""Verify that agent-level tool_choice configuration is preserved and not overridden."""
from unittest.mock import AsyncMock
@@ -33,6 +33,7 @@ from agent_framework import (
WorkflowStatusEvent,
handler,
)
from agent_framework._workflows import _group_chat as group_chat_module # type: ignore
from agent_framework._workflows._checkpoint import InMemoryCheckpointStorage
from agent_framework._workflows._magentic import ( # type: ignore[reportPrivateUsage]
MagenticAgentExecutor,
@@ -42,6 +43,7 @@ from agent_framework._workflows._magentic import ( # type: ignore[reportPrivate
_MagenticProgressLedgerItem, # type: ignore
_MagenticStartMessage, # type: ignore
)
from agent_framework._workflows._workflow_builder import WorkflowBuilder
if sys.version_info >= (3, 12):
from typing import override
@@ -162,6 +164,19 @@ class FakeManager(MagenticManagerBase):
return ChatMessage(role=Role.ASSISTANT, text="FINAL", author_name="magentic_manager")
class _CountingWorkflowBuilder(WorkflowBuilder):
created: list["_CountingWorkflowBuilder"] = []
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.start_calls = 0
_CountingWorkflowBuilder.created.append(self)
def set_start_executor(self, executor: Any) -> "_CountingWorkflowBuilder": # type: ignore[override]
self.start_calls += 1
return cast("_CountingWorkflowBuilder", super().set_start_executor(executor))
async def test_standard_manager_plan_and_replan_combined_ledger():
manager = FakeManager(max_round_count=10, max_stall_count=3, max_reset_count=2)
ctx = MagenticContext(
@@ -210,7 +225,7 @@ async def test_magentic_workflow_plan_review_approval_to_completion():
assert req_event is not None
completed = False
output: ChatMessage | None = None
output: list[ChatMessage] | None = None
async for ev in wf.send_responses_streaming(
responses={req_event.request_id: MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE)}
):
@@ -222,7 +237,8 @@ async def test_magentic_workflow_plan_review_approval_to_completion():
break
assert completed
assert output is not None
assert isinstance(output, ChatMessage)
assert isinstance(output, list)
assert all(isinstance(msg, ChatMessage) for msg in output)
async def test_magentic_plan_review_approve_with_comments_replans_and_proceeds():
@@ -300,8 +316,10 @@ async def test_magentic_orchestrator_round_limit_produces_partial_result():
output_event = next((e for e in events if isinstance(e, WorkflowOutputEvent)), None)
assert output_event is not None
data = output_event.data
assert isinstance(data, ChatMessage)
assert data.role == Role.ASSISTANT
assert isinstance(data, list)
assert all(isinstance(msg, ChatMessage) for msg in data)
assert len(data) > 0
assert data[-1].role == Role.ASSISTANT
async def test_magentic_checkpoint_resume_round_trip():
@@ -374,6 +392,23 @@ class _DummyExec(Executor):
pass
def test_magentic_builder_sets_start_executor_once(monkeypatch: pytest.MonkeyPatch) -> None:
"""Ensure MagenticBuilder wiring sets the start executor only once."""
_CountingWorkflowBuilder.created.clear()
monkeypatch.setattr(group_chat_module, "WorkflowBuilder", _CountingWorkflowBuilder)
manager = FakeManager()
workflow = (
MagenticBuilder().participants(agentA=_DummyExec("agentA")).with_standard_manager(manager=manager).build()
)
assert workflow is not None
assert _CountingWorkflowBuilder.created, "Expected CountingWorkflowBuilder to be instantiated"
builder = _CountingWorkflowBuilder.created[-1]
assert builder.start_calls == 1, "set_start_executor should be called exactly once"
async def test_magentic_agent_executor_on_checkpoint_save_and_restore_roundtrip():
backing_executor = _DummyExec("backing")
agent_exec = MagenticAgentExecutor(backing_executor, "agentA")
@@ -746,9 +781,11 @@ async def test_magentic_stall_and_reset_successfully():
assert idle_status is not None
output_event = next((e for e in events if isinstance(e, WorkflowOutputEvent)), None)
assert output_event is not None
assert isinstance(output_event.data, ChatMessage)
assert output_event.data.text is not None
assert output_event.data.text == "re-ledger"
assert isinstance(output_event.data, list)
assert all(isinstance(msg, ChatMessage) for msg in output_event.data)
assert len(output_event.data) > 0
assert output_event.data[-1].text is not None
assert output_event.data[-1].text == "re-ledger"
async def test_magentic_checkpoint_runtime_only() -> None:
+1 -91
View File
@@ -268,97 +268,7 @@ This directory contains samples demonstrating the capabilities of Microsoft Agen
## Workflows
### Start Here
| File | Description |
|------|-------------|
| [`getting_started/workflows/_start-here/step1_executors_and_edges.py`](./getting_started/workflows/_start-here/step1_executors_and_edges.py) | Step 1: Foundational patterns: Executors and edges |
| [`getting_started/workflows/_start-here/step2_agents_in_a_workflow.py`](./getting_started/workflows/_start-here/step2_agents_in_a_workflow.py) | Step 2: Agents in a Workflow non-streaming |
| [`getting_started/workflows/_start-here/step3_streaming.py`](./getting_started/workflows/_start-here/step3_streaming.py) | Step 3: Agents in a workflow with streaming |
### Agents in Workflows
| File | Description |
|------|-------------|
| [`getting_started/workflows/agents/azure_ai_agents_streaming.py`](./getting_started/workflows/agents/azure_ai_agents_streaming.py) | Sample: Agents in a workflow with streaming |
| [`getting_started/workflows/agents/azure_chat_agents_function_bridge.py`](./getting_started/workflows/agents/azure_chat_agents_function_bridge.py) | Sample: Two agents connected by a function executor bridge |
| [`getting_started/workflows/agents/azure_chat_agents_streaming.py`](./getting_started/workflows/agents/azure_chat_agents_streaming.py) | Sample: Agents in a workflow with streaming |
| [`getting_started/workflows/agents/azure_chat_agents_tool_calls_with_feedback.py`](./getting_started/workflows/agents/azure_chat_agents_tool_calls_with_feedback.py) | Sample: Tool-enabled agents with human feedback |
| [`getting_started/workflows/agents/custom_agent_executors.py`](./getting_started/workflows/agents/custom_agent_executors.py) | Step 2: Agents in a Workflow non-streaming |
| [`getting_started/workflows/agents/workflow_as_agent_human_in_the_loop.py`](./getting_started/workflows/agents/workflow_as_agent_human_in_the_loop.py) | Sample: Workflow Agent with Human-in-the-Loop |
| [`getting_started/workflows/agents/workflow_as_agent_reflection_pattern.py`](./getting_started/workflows/agents/workflow_as_agent_reflection_pattern.py) | Sample: Workflow as Agent with Reflection and Retry Pattern |
### Checkpoint
| File | Description |
|------|-------------|
| [`getting_started/workflows/checkpoint/checkpoint_with_human_in_the_loop.py`](./getting_started/workflows/checkpoint/checkpoint_with_human_in_the_loop.py) | Sample: Checkpoint + human-in-the-loop quickstart |
| [`getting_started/workflows/checkpoint/checkpoint_with_resume.py`](./getting_started/workflows/checkpoint/checkpoint_with_resume.py) | Sample: Checkpointing and Resuming a Workflow (with an Agent stage) |
| [`getting_started/workflows/checkpoint/sub_workflow_checkpoint.py`](./getting_started/workflows/checkpoint/sub_workflow_checkpoint.py) | Sample: Checkpointing for workflows that embed sub-workflows |
### Composition
| File | Description |
|------|-------------|
| [`getting_started/workflows/composition/sub_workflow_basics.py`](./getting_started/workflows/composition/sub_workflow_basics.py) | Sample: Sub-Workflows (Basics) |
| [`getting_started/workflows/composition/sub_workflow_parallel_requests.py`](./getting_started/workflows/composition/sub_workflow_parallel_requests.py) | Sample: Sub-workflow with parallel request handling by specialized interceptors |
| [`getting_started/workflows/composition/sub_workflow_request_interception.py`](./getting_started/workflows/composition/sub_workflow_request_interception.py) | Sample: Sub-Workflows with Request Interception |
### Control Flow
| File | Description |
|------|-------------|
| [`getting_started/workflows/control-flow/edge_condition.py`](./getting_started/workflows/control-flow/edge_condition.py) | Sample: Conditional routing with structured outputs |
| [`getting_started/workflows/control-flow/multi_selection_edge_group.py`](./getting_started/workflows/control-flow/multi_selection_edge_group.py) | Step 06b — Multi-Selection Edge Group sample |
| [`getting_started/workflows/control-flow/sequential_executors.py`](./getting_started/workflows/control-flow/sequential_executors.py) | Sample: Sequential workflow with streaming |
| [`getting_started/workflows/control-flow/sequential_streaming.py`](./getting_started/workflows/control-flow/sequential_streaming.py) | Sample: Foundational sequential workflow with streaming using function-style executors |
| [`getting_started/workflows/control-flow/simple_loop.py`](./getting_started/workflows/control-flow/simple_loop.py) | Sample: Simple Loop (with an Agent Judge) |
| [`getting_started/workflows/control-flow/switch_case_edge_group.py`](./getting_started/workflows/control-flow/switch_case_edge_group.py) | Sample: Switch-Case Edge Group with an explicit Uncertain branch |
### Human-in-the-Loop
| File | Description |
|------|-------------|
| [`getting_started/workflows/human-in-the-loop/guessing_game_with_human_input.py`](./getting_started/workflows/human-in-the-loop/guessing_game_with_human_input.py) | Sample: Human in the loop guessing game |
| [`getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py`](./getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py) | Sample: Agents with Approval Requests in Workflows |
### Orchestration
| File | Description |
|------|-------------|
| [`getting_started/workflows/orchestration/concurrent_agents.py`](./getting_started/workflows/orchestration/concurrent_agents.py) | Sample: Concurrent fan-out/fan-in (agent-only API) with default aggregator |
| [`getting_started/workflows/orchestration/concurrent_custom_agent_executors.py`](./getting_started/workflows/orchestration/concurrent_custom_agent_executors.py) | Sample: Concurrent Orchestration with Custom Agent Executors |
| [`getting_started/workflows/orchestration/concurrent_custom_aggregator.py`](./getting_started/workflows/orchestration/concurrent_custom_aggregator.py) | Sample: Concurrent Orchestration with Custom Aggregator |
| [`getting_started/workflows/orchestration/group_chat_prompt_based_manager.py`](./getting_started/workflows/orchestration/group_chat_prompt_based_manager.py) | Sample: Group Chat Orchestration with LLM-based manager |
| [`getting_started/workflows/orchestration/group_chat_simple_selector.py`](./getting_started/workflows/orchestration/group_chat_simple_selector.py) | Sample: Group Chat Orchestration with function-based speaker selector |
| [`getting_started/workflows/orchestration/handoff_simple.py`](./getting_started/workflows/orchestration/handoff_simple.py) | Sample: Handoff Orchestration with simple agent handoff pattern |
| [`getting_started/workflows/orchestration/handoff_specialist_to_specialist.py`](./getting_started/workflows/orchestration/handoff_specialist_to_specialist.py) | Sample: Handoff Orchestration with specialist-to-specialist routing |
| [`getting_started/workflows/orchestration/handoff_return_to_previous`](./getting_started/workflows/orchestration/handoff_return_to_previous.py) | Return-to-previous routing: after user input, routes back to the previous specialist instead of coordinator using `.enable_return_to_previous()` |
| [`getting_started/workflows/orchestration/magentic.py`](./getting_started/workflows/orchestration/magentic.py) | Sample: Magentic Orchestration (agentic task planning with multi-agent execution) |
| [`getting_started/workflows/orchestration/magentic_checkpoint.py`](./getting_started/workflows/orchestration/magentic_checkpoint.py) | Sample: Magentic Orchestration with Checkpointing |
| [`getting_started/workflows/orchestration/magentic_human_plan_update.py`](./getting_started/workflows/orchestration/magentic_human_plan_update.py) | Sample: Magentic Orchestration with Human Plan Review |
| [`getting_started/workflows/orchestration/sequential_agents.py`](./getting_started/workflows/orchestration/sequential_agents.py) | Sample: Sequential workflow (agent-focused API) with shared conversation context |
| [`getting_started/workflows/orchestration/sequential_custom_executors.py`](./getting_started/workflows/orchestration/sequential_custom_executors.py) | Sample: Sequential workflow mixing agents and a custom summarizer executor |
### Parallelism
| File | Description |
|------|-------------|
| [`getting_started/workflows/parallelism/aggregate_results_of_different_types.py`](./getting_started/workflows/parallelism/aggregate_results_of_different_types.py) | Sample: Concurrent fan out and fan in with two different tasks that output results of different types |
| [`getting_started/workflows/parallelism/fan_out_fan_in_edges.py`](./getting_started/workflows/parallelism/fan_out_fan_in_edges.py) | Sample: Concurrent fan out and fan in with three domain agents |
| [`getting_started/workflows/parallelism/map_reduce_and_visualization.py`](./getting_started/workflows/parallelism/map_reduce_and_visualization.py) | Sample: Map reduce word count with fan out and fan in over file backed intermediate results |
### State Management
| File | Description |
|------|-------------|
| [`getting_started/workflows/state-management/shared_states_with_agents.py`](./getting_started/workflows/state-management/shared_states_with_agents.py) | Sample: Shared state with agents and conditional routing |
### Visualization
| File | Description |
|------|-------------|
| [`getting_started/workflows/visualization/concurrent_with_visualization.py`](./getting_started/workflows/visualization/concurrent_with_visualization.py) | Sample: Concurrent (Fan-out/Fan-in) with Agents + Visualization |
View the list of Workflows samples [here](./getting_started/workflows/README.md).
## Sample Guidelines
@@ -92,7 +92,8 @@ For observability samples in Agent Framework, see the [observability getting sta
| Concurrent Orchestration (Default Aggregator) | [orchestration/concurrent_agents.py](./orchestration/concurrent_agents.py) | Fan-out to multiple agents; fan-in with default aggregator returning combined ChatMessages |
| Concurrent Orchestration (Custom Aggregator) | [orchestration/concurrent_custom_aggregator.py](./orchestration/concurrent_custom_aggregator.py) | Override aggregator via callback; summarize results with an LLM |
| Concurrent Orchestration (Custom Agent Executors) | [orchestration/concurrent_custom_agent_executors.py](./orchestration/concurrent_custom_agent_executors.py) | Child executors own ChatAgents; concurrent fan-out/fan-in via ConcurrentBuilder |
| Group Chat Orchestration with Prompt Based Manager | [orchestration/group_chat_prompt_based_manager.py](./orchestration/group_chat_prompt_based_manager.py) | LLM Manager-directed conversation using GroupChatBuilder |
| Group Chat with Agent Manager | [orchestration/group_chat_agent_manager.py](./orchestration/group_chat_agent_manager.py) | Agent-based manager using `set_manager()` to select next speaker |
| Group Chat Philosophical Debate | [orchestration/group_chat_philosophical_debate.py](./orchestration/group_chat_philosophical_debate.py) | Agent manager moderates long-form, multi-round debate across diverse participants |
| Group Chat with Simple Function Selector | [orchestration/group_chat_simple_selector.py](./orchestration/group_chat_simple_selector.py) | Group chat with a simple function selector for next speaker |
| Handoff (Simple) | [orchestration/handoff_simple.py](./orchestration/handoff_simple.py) | Single-tier routing: triage agent routes to specialists, control returns to user after each specialist response |
| Handoff (Specialist-to-Specialist) | [orchestration/handoff_specialist_to_specialist.py](./orchestration/handoff_specialist_to_specialist.py) | Multi-tier routing: specialists can hand off to other specialists using `.add_handoff()` fluent API |
@@ -0,0 +1,112 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import logging
from typing import cast
from agent_framework import (
AgentRunUpdateEvent,
ChatAgent,
ChatMessage,
GroupChatBuilder,
Role,
WorkflowOutputEvent,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
logging.basicConfig(level=logging.INFO)
"""
Sample: Group Chat with Agent-Based Manager
What it does:
- Demonstrates the new set_manager() API for agent-based coordination
- Manager is a full ChatAgent with access to tools, context, and observability
- Coordinates a researcher and writer agent to solve tasks collaboratively
Prerequisites:
- OpenAI environment variables configured for OpenAIChatClient
"""
def _get_chat_client() -> AzureOpenAIChatClient:
return AzureOpenAIChatClient(credential=AzureCliCredential())
async def main() -> None:
# Create coordinator agent with structured output for speaker selection
# Note: response_format is enforced to ManagerSelectionResponse by set_manager()
coordinator = ChatAgent(
name="Coordinator",
description="Coordinates multi-agent collaboration by selecting speakers",
instructions="""
You coordinate a team conversation to solve the user's task.
Review the conversation history and select the next participant to speak.
Guidelines:
- Start with Researcher to gather information
- Then have Writer synthesize the final answer
- Only finish after both have contributed meaningfully
- Allow for multiple rounds of information gathering if needed
""",
chat_client=_get_chat_client(),
)
researcher = ChatAgent(
name="Researcher",
description="Collects relevant background information",
instructions="Gather concise facts that help a teammate answer the question.",
chat_client=_get_chat_client(),
)
writer = ChatAgent(
name="Writer",
description="Synthesizes polished answers from gathered information",
instructions="Compose clear and structured answers using any notes provided.",
chat_client=_get_chat_client(),
)
workflow = (
GroupChatBuilder()
.set_manager(coordinator, display_name="Orchestrator")
.with_termination_condition(lambda messages: sum(1 for msg in messages if msg.role == Role.ASSISTANT) >= 2)
.participants([researcher, writer])
.build()
)
task = "What are the key benefits of using async/await in Python? Provide a concise summary."
print("\nStarting Group Chat with Agent-Based Manager...\n")
print(f"TASK: {task}\n")
print("=" * 80)
final_conversation: list[ChatMessage] = []
last_executor_id: str | None = None
async for event in workflow.run_stream(task):
if isinstance(event, AgentRunUpdateEvent):
eid = event.executor_id
if eid != last_executor_id:
if last_executor_id is not None:
print()
print(f"{eid}:", end=" ", flush=True)
last_executor_id = eid
print(event.data, end="", flush=True)
elif isinstance(event, WorkflowOutputEvent):
final_conversation = cast(list[ChatMessage], event.data)
if final_conversation and isinstance(final_conversation, list):
print("\n\n" + "=" * 80)
print("FINAL CONVERSATION")
print("=" * 80)
for msg in final_conversation:
author = getattr(msg, "author_name", "Unknown")
text = getattr(msg, "text", str(msg))
print(f"\n[{author}]")
print(text)
print("-" * 80)
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,408 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import logging
from typing import cast
from agent_framework import (
AgentRunUpdateEvent,
ChatAgent,
ChatMessage,
GroupChatBuilder,
Role,
WorkflowOutputEvent,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
logging.basicConfig(level=logging.WARNING)
"""
Sample: Philosophical Debate with Agent-Based Manager
What it does:
- Creates a diverse group of agents representing different global perspectives
- Uses an agent-based manager to guide a philosophical discussion
- Demonstrates longer, multi-round discourse with natural conversation flow
- Manager decides when discussion has reached meaningful conclusion
Topic: "What does a good life mean to you personally?"
Participants represent:
- Farmer from Southeast Asia (tradition, sustainability, land connection)
- Software Developer from United States (innovation, technology, work-life balance)
- History Teacher from Eastern Europe (legacy, learning, cultural continuity)
- Activist from South America (social justice, environmental rights)
- Spiritual Leader from Middle East (morality, community service)
- Artist from Africa (creative expression, storytelling)
- Immigrant Entrepreneur from Asia in Canada (tradition + adaptation)
- Doctor from Scandinavia (public health, equity, societal support)
Prerequisites:
- OpenAI environment variables configured for OpenAIChatClient
"""
def _get_chat_client() -> AzureOpenAIChatClient:
return AzureOpenAIChatClient(credential=AzureCliCredential())
async def main() -> None:
# Create debate moderator with structured output for speaker selection
# Note: Participant names and descriptions are automatically injected by the orchestrator
moderator = ChatAgent(
name="Moderator",
description="Guides philosophical discussion by selecting next speaker",
instructions="""
You are a thoughtful moderator guiding a philosophical discussion on the topic handed to you by the user.
Your participants bring diverse global perspectives. Select speakers strategically to:
- Create natural conversation flow and responses to previous points
- Ensure all voices are heard throughout the discussion
- Build on themes and contrasts that emerge
- Allow for respectful challenges and counterpoints
- Guide toward meaningful conclusions
Select speakers who can:
1. Respond directly to points just made
2. Introduce fresh perspectives when needed
3. Bridge or contrast different viewpoints
4. Deepen the philosophical exploration
Finish when:
- Multiple rounds have occurred (at least 6-8 exchanges)
- Key themes have been explored from different angles
- Natural conclusion or synthesis has emerged
- Diminishing returns in new insights
In your final_message, provide a brief synthesis highlighting key themes that emerged.
""",
chat_client=_get_chat_client(),
)
farmer = ChatAgent(
name="Farmer",
description="A rural farmer from Southeast Asia",
instructions="""
You're a farmer from Southeast Asia. Your life is deeply connected to land and family.
You value tradition and sustainability. You are in a philosophical debate.
Share your perspective authentically. Feel free to:
- Challenge other participants respectfully
- Build on points others have made
- Use concrete examples from your experience
- Keep responses thoughtful but concise (2-4 sentences)
""",
chat_client=_get_chat_client(),
)
developer = ChatAgent(
name="Developer",
description="An urban software developer from the United States",
instructions="""
You're a software developer from the United States. Your life is fast-paced and technology-driven.
You value innovation, freedom, and work-life balance. You are in a philosophical debate.
Share your perspective authentically. Feel free to:
- Challenge other participants respectfully
- Build on points others have made
- Use concrete examples from your experience
- Keep responses thoughtful but concise (2-4 sentences)
""",
chat_client=_get_chat_client(),
)
teacher = ChatAgent(
name="Teacher",
description="A retired history teacher from Eastern Europe",
instructions="""
You're a retired history teacher from Eastern Europe. You bring historical and philosophical
perspectives to discussions. You value legacy, learning, and cultural continuity.
You are in a philosophical debate.
Share your perspective authentically. Feel free to:
- Challenge other participants respectfully
- Build on points others have made
- Use concrete examples from history or your teaching experience
- Keep responses thoughtful but concise (2-4 sentences)
""",
chat_client=_get_chat_client(),
)
activist = ChatAgent(
name="Activist",
description="A young activist from South America",
instructions="""
You're a young activist from South America. You focus on social justice, environmental rights,
and generational change. You are in a philosophical debate.
Share your perspective authentically. Feel free to:
- Challenge other participants respectfully
- Build on points others have made
- Use concrete examples from your activism
- Keep responses thoughtful but concise (2-4 sentences)
""",
chat_client=_get_chat_client(),
)
spiritual_leader = ChatAgent(
name="SpiritualLeader",
description="A spiritual leader from the Middle East",
instructions="""
You're a spiritual leader from the Middle East. You provide insights grounded in religion,
morality, and community service. You are in a philosophical debate.
Share your perspective authentically. Feel free to:
- Challenge other participants respectfully
- Build on points others have made
- Use examples from spiritual teachings or community work
- Keep responses thoughtful but concise (2-4 sentences)
""",
chat_client=_get_chat_client(),
)
artist = ChatAgent(
name="Artist",
description="An artist from Africa",
instructions="""
You're an artist from Africa. You view life through creative expression, storytelling,
and collective memory. You are in a philosophical debate.
Share your perspective authentically. Feel free to:
- Challenge other participants respectfully
- Build on points others have made
- Use examples from your art or cultural traditions
- Keep responses thoughtful but concise (2-4 sentences)
""",
chat_client=_get_chat_client(),
)
immigrant = ChatAgent(
name="Immigrant",
description="An immigrant entrepreneur from Asia living in Canada",
instructions="""
You're an immigrant entrepreneur from Asia living in Canada. You balance tradition with adaptation.
You focus on family success, risk, and opportunity. You are in a philosophical debate.
Share your perspective authentically. Feel free to:
- Challenge other participants respectfully
- Build on points others have made
- Use examples from your immigrant and entrepreneurial journey
- Keep responses thoughtful but concise (2-4 sentences)
""",
chat_client=_get_chat_client(),
)
doctor = ChatAgent(
name="Doctor",
description="A doctor from Scandinavia",
instructions="""
You're a doctor from Scandinavia. Your perspective is shaped by public health, equity,
and structured societal support. You are in a philosophical debate.
Share your perspective authentically. Feel free to:
- Challenge other participants respectfully
- Build on points others have made
- Use examples from healthcare and societal systems
- Keep responses thoughtful but concise (2-4 sentences)
""",
chat_client=_get_chat_client(),
)
workflow = (
GroupChatBuilder()
.set_manager(moderator, display_name="Moderator")
.participants([farmer, developer, teacher, activist, spiritual_leader, artist, immigrant, doctor])
.with_termination_condition(lambda messages: sum(1 for msg in messages if msg.role == Role.ASSISTANT) >= 10)
.build()
)
topic = "What does a good life mean to you personally?"
print("\n" + "=" * 80)
print("PHILOSOPHICAL DEBATE: Perspectives on a Good Life")
print("=" * 80)
print(f"\nTopic: {topic}")
print("\nParticipants:")
print(" - Farmer (Southeast Asia)")
print(" - Developer (United States)")
print(" - Teacher (Eastern Europe)")
print(" - Activist (South America)")
print(" - SpiritualLeader (Middle East)")
print(" - Artist (Africa)")
print(" - Immigrant (Asia → Canada)")
print(" - Doctor (Scandinavia)")
print("\n" + "=" * 80)
print("DISCUSSION BEGINS")
print("=" * 80 + "\n")
final_conversation: list[ChatMessage] = []
current_speaker: str | None = None
async for event in workflow.run_stream(f"Please begin the discussion on: {topic}"):
if isinstance(event, AgentRunUpdateEvent):
speaker_id = event.executor_id.replace("groupchat_agent:", "")
if speaker_id != current_speaker:
if current_speaker is not None:
print("\n")
print(f"[{speaker_id}]", flush=True)
current_speaker = speaker_id
print(event.data, end="", flush=True)
elif isinstance(event, WorkflowOutputEvent):
final_conversation = cast(list[ChatMessage], event.data)
print("\n\n" + "=" * 80)
print("DISCUSSION SUMMARY")
print("=" * 80)
if final_conversation and isinstance(final_conversation, list) and final_conversation:
final_msg = final_conversation[-1]
if hasattr(final_msg, "author_name") and final_msg.author_name == "Moderator":
print(f"\n{final_msg.text}")
"""
Sample Output:
================================================================================
PHILOSOPHICAL DEBATE: Perspectives on a Good Life
================================================================================
Topic: What does a good life mean to you personally?
Participants:
- Farmer (Southeast Asia)
- Developer (United States)
- Teacher (Eastern Europe)
- Activist (South America)
- SpiritualLeader (Middle East)
- Artist (Africa)
- Immigrant (Asia → Canada)
- Doctor (Scandinavia)
================================================================================
DISCUSSION BEGINS
================================================================================
[Moderator]
{"selected_participant":"Farmer","instruction":"Please start by sharing what living a good life means to you,
especially from your perspective living in a rural area in Southeast Asia.","finish":false,"final_message":null}
[Farmer]
To me, a good life is deeply intertwined with the rhythm of the land and the nurturing of relationships with my
family and community. It means cultivating crops that respect our environment, ensuring sustainability for future
generations, and sharing meals made from our harvests around the dinner table. The joy found in everyday
tasks—planting rice or tending to our livestock—creates a sense of fulfillment that cannot be measured by material
wealth. It's the simple moments, like sharing stories with my children under the stars, that truly define a good
life. What good is progress if it isolates us from those we love and the land that sustains us?
[Moderator]
{"selected_participant":"Developer","instruction":"Given the insights shared by the Farmer, please discuss what a
good life means to you as a software developer in an urban setting in the United States and how it might contrast
with or complement the Farmer's view.","finish":false,"final_message":null}
[Developer]
As a software developer in an urban environment, a good life for me hinges on the intersection of innovation,
creativity, and balance. It's about having the freedom to explore new technologies that can solve real-world
problems while ensuring that my work doesn't encroach on my personal life. For instance, I value remote work
flexibility, which allows me to maintain connections with family and friends, similar to how the Farmer values
community. While our lifestyles may differ markedly, both of us seek fulfillment—whether through meaningful work or
rich personal experiences. The challenge is finding harmony between technological progress and preserving the
intimate human connections that truly enrich our lives.
[Moderator]
{"selected_participant":"SpiritualLeader","instruction":"Reflect on both the Farmer's and Developer's perspectives
and share your view of what constitutes a good life, particularly from your spiritual and cultural standpoint in
the Middle East.","finish":false,"final_message":null}
[SpiritualLeader]
From my spiritual perspective, a good life embodies a balance between personal fulfillment and service to others,
rooted in compassion and community. In our teachings, we emphasize that true happiness comes from helping those in
need and fostering strong connections with our families and neighbors. Whether it's the Farmer nurturing the earth
or the Developer creating tools to enhance lives, both contribute to the greater good. The essence of a good life
lies in our intentions and actions—finding ways to serve our communities, spread kindness, and live harmoniously
with those around us. Ultimately, as we align our personal beliefs with our communal responsibilities, we cultivate
a richness that transcends material wealth.
[Moderator]
{"selected_participant":"Activist","instruction":"Add to the discussion by sharing your perspective on what a good
life entails, particularly from your background as a young activist in South America.","finish":false,
"final_message":null}
[Activist]
As a young activist in South America, a good life for me is about advocating for social justice and environmental
sustainability. It means living in a society where everyone's rights are respected and where marginalized voices,
particularly those of Indigenous communities, are amplified. I see a good life as one where we work collectively to
dismantle oppressive systems—such as deforestation and inequality—while nurturing our planet. For instance, through
my activism, I've witnessed the transformative power of community organizing, where collective efforts lead to real
change, like resisting destructive mining practices that threaten our rivers and lands. A good life, therefore, is
not just lived for oneself but is deeply tied to the well-being of our communities and the health of our
environment. How can we, regardless of our backgrounds, collaborate to foster these essential changes?
[Moderator]
{"selected_participant":"Teacher","instruction":"Considering the views shared so far, tell us how your experience
as a retired history teacher from Eastern Europe shapes your understanding of a good life, perhaps reflecting on
lessons from the past and their impact on present-day life choices.","finish":false,"final_message":null}
[Teacher]
As a retired history teacher from Eastern Europe, my understanding of a good life is deeply rooted in the lessons
drawn from history and the struggle for freedom and dignity. Historical events, such as the fall of the Iron
Curtain, remind us of the profound importance of liberty and collective resilience. A good life, therefore, is about
cherishing our freedoms and working towards a society where everyone has a voice, much as my students and I
discussed the impacts of totalitarian regimes. Additionally, I believe it involves fostering cultural continuity,
where we honor our heritage while embracing progressive values. We must learn from the past—especially the
consequences of neglecting empathy and solidarity—so that we can cultivate a future that values every individual's
contributions to the rich tapestry of our shared humanity. How can we ensure that the lessons of history inform a
more compassionate and just society moving forward?
[Moderator]
{"selected_participant":"Artist","instruction":"Expound on the themes and perspectives discussed so far by sharing
how, as an artist from Africa, you define a good life and how art plays a role in that vision.","finish":false,
"final_message":null}
[Artist]
As an artist from Africa, I define a good life as one steeped in cultural expression, storytelling, and the
celebration of our collective memories. Art is a powerful medium through which we capture our histories, struggles,
and triumphs, creating a tapestry that connects generations. For instance, in my work, I often draw from folktales
and traditional music, weaving narratives that reflect the human experience, much like how the retired teacher
emphasizes learning from history. A good life involves not only personal fulfillment but also the responsibility to
share our narratives and use our creativity to inspire change, whether addressing social injustices or environmental
issues. It's in this interplay of art and activism that we can transcend individual existence and contribute to a
collective good, fostering empathy and understanding among diverse communities. How can we harness art to bridge
differences and amplify marginalized voices in our pursuit of a good life?
[Moderator]
{"selected_participant":null,"instruction":null,"finish":true,"final_message":"As our discussion unfolds, several
key themes have gracefully emerged, reflecting the richness of diverse perspectives on what constitutes a good life.
From the rural farmer's integration with the land to the developer's search for balance between technology and
personal connection, each viewpoint validates that fulfillment, at its core, transcends material wealth. The
spiritual leader and the activist highlight the importance of community and social justice, while the history
teacher and the artist remind us of the lessons and narratives that shape our cultural and personal identities.
Ultimately, the good life seems to revolve around meaningful relationships, honoring our legacies while striving for
progress, and nurturing both our inner selves and external communities. This dialogue demonstrates that despite our
varied backgrounds and experiences, the quest for a good life binds us together, urging cooperation and empathy in
our shared human journey."}
================================================================================
DISCUSSION SUMMARY
================================================================================
As our discussion unfolds, several key themes have gracefully emerged, reflecting the richness of diverse
perspectives on what constitutes a good life. From the rural farmer's integration with the land to the developer's
search for balance between technology and personal connection, each viewpoint validates that fulfillment, at its
core, transcends material wealth. The spiritual leader and the activist highlight the importance of community and
social justice, while the history teacher and the artist remind us of the lessons and narratives that shape our
cultural and personal identities.
Ultimately, the good life seems to revolve around meaningful relationships, honoring our legacies while striving for
progress, and nurturing both our inner selves and external communities. This dialogue demonstrates that despite our
varied backgrounds and experiences, the quest for a good life binds us together, urging cooperation and empathy in
our shared human journey.
"""
if __name__ == "__main__":
asyncio.run(main())
@@ -1,75 +0,0 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import logging
from agent_framework import AgentRunUpdateEvent, ChatAgent, GroupChatBuilder, WorkflowOutputEvent
from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient
logging.basicConfig(level=logging.INFO)
"""
Sample: Group Chat Orchestration (manager-directed)
What it does:
- Demonstrates the generic GroupChatBuilder with a language-model manager directing two agents.
- The manager coordinates a researcher (chat completions) and a writer (responses API) to solve a task.
- Uses the default group chat orchestration pipeline shared with Magentic.
Prerequisites:
- OpenAI environment variables configured for `OpenAIChatClient` and `OpenAIResponsesClient`.
"""
async def main() -> None:
researcher = ChatAgent(
name="Researcher",
description="Collects relevant background information.",
instructions="Gather concise facts that help a teammate answer the question.",
chat_client=OpenAIChatClient(model_id="gpt-4o-mini"),
)
writer = ChatAgent(
name="Writer",
description="Synthesizes a polished answer using the gathered notes.",
instructions="Compose clear and structured answers using any notes provided.",
chat_client=OpenAIResponsesClient(),
)
workflow = (
GroupChatBuilder()
.set_prompt_based_manager(chat_client=OpenAIChatClient(), display_name="Coordinator")
.participants(researcher=researcher, writer=writer)
.build()
)
task = "Outline the core considerations for planning a community hackathon, and finish with a concise action plan."
print("\nStarting Group Chat Workflow...\n")
print(f"TASK: {task}\n")
final_response = None
last_executor_id: str | None = None
async for event in workflow.run_stream(task):
if isinstance(event, AgentRunUpdateEvent):
# Handle the streaming agent update as it's produced
eid = event.executor_id
if eid != last_executor_id:
if last_executor_id is not None:
print()
print(f"{eid}:", end=" ", flush=True)
last_executor_id = eid
print(event.data, end="", flush=True)
elif isinstance(event, WorkflowOutputEvent):
final_response = getattr(event.data, "text", str(event.data))
if final_response:
print("=" * 60)
print("FINAL RESPONSE")
print("=" * 60)
print(final_response)
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())
@@ -2,8 +2,9 @@
import asyncio
import logging
from typing import cast
from agent_framework import ChatAgent, GroupChatBuilder, GroupChatStateSnapshot, WorkflowOutputEvent
from agent_framework import ChatAgent, ChatMessage, GroupChatBuilder, GroupChatStateSnapshot, WorkflowOutputEvent
from agent_framework.openai import OpenAIChatClient
logging.basicConfig(level=logging.INFO)
@@ -12,7 +13,7 @@ logging.basicConfig(level=logging.INFO)
Sample: Group Chat with Simple Speaker Selector Function
What it does:
- Demonstrates the select_speakers() API for GroupChat orchestration
- Demonstrates the set_select_speakers_func() API for GroupChat orchestration
- Uses a pure Python function to control speaker selection based on conversation state
- Alternates between researcher and writer agents in a simple round-robin pattern
- Shows how to access conversation history, round index, and participant metadata
@@ -84,7 +85,7 @@ async def main() -> None:
# 2. Dict form - explicit names: .participants(researcher=researcher, writer=writer)
workflow = (
GroupChatBuilder()
.select_speakers(select_next_speaker, display_name="Orchestrator")
.set_select_speakers_func(select_next_speaker, display_name="Orchestrator")
.participants([researcher, writer]) # Uses agent.name for participant names
.build()
)
@@ -97,11 +98,14 @@ async def main() -> None:
async for event in workflow.run_stream(task):
if isinstance(event, WorkflowOutputEvent):
final_message = event.data
author = getattr(final_message, "author_name", "Unknown")
text = getattr(final_message, "text", str(final_message))
print(f"\n[{author}]\n{text}\n")
print("-" * 80)
conversation = cast(list[ChatMessage], event.data)
if isinstance(conversation, list):
print("\n===== Final Conversation =====\n")
for msg in conversation:
author = getattr(msg, "author_name", "Unknown")
text = getattr(msg, "text", str(msg))
print(f"[{author}]\n{text}\n")
print("-" * 80)
print("\nWorkflow completed.")
@@ -2,20 +2,21 @@
import asyncio
import logging
from typing import cast
from agent_framework import (
MAGENTIC_EVENT_TYPE_AGENT_DELTA,
MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
AgentRunUpdateEvent,
ChatAgent,
ChatMessage,
HostedCodeInterpreterTool,
MagenticAgentDeltaEvent,
MagenticAgentMessageEvent,
MagenticBuilder,
MagenticFinalResultEvent,
MagenticOrchestratorMessageEvent,
WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient
logging.basicConfig(level=logging.DEBUG)
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)
"""
@@ -97,35 +98,30 @@ async def main() -> None:
try:
output: str | None = None
async for event in workflow.run_stream(task):
if isinstance(event, MagenticOrchestratorMessageEvent):
print(f"\n[ORCH:{event.kind}]\n\n{getattr(event.message, 'text', '')}\n{'-' * 26}")
elif isinstance(event, MagenticAgentDeltaEvent):
if last_stream_agent_id != event.agent_id or not stream_line_open:
if stream_line_open:
print()
print(f"\n[STREAM:{event.agent_id}]: ", end="", flush=True)
last_stream_agent_id = event.agent_id
stream_line_open = True
if event.text:
print(event.text, end="", flush=True)
elif isinstance(event, MagenticAgentMessageEvent):
if stream_line_open:
print(" (final)")
stream_line_open = False
print()
msg = event.message
if msg is not None:
response_text = (msg.text or "").replace("\n", " ")
print(f"\n[AGENT:{event.agent_id}] {msg.role.value}\n\n{response_text}\n{'-' * 26}")
elif isinstance(event, MagenticFinalResultEvent):
print("\n" + "=" * 50)
print("FINAL RESULT:")
print("=" * 50)
if event.message is not None:
print(event.message.text)
print("=" * 50)
if isinstance(event, AgentRunUpdateEvent):
props = event.data.additional_properties if event.data else None
event_type = props.get("magentic_event_type") if props else None
if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
kind = props.get("orchestrator_message_kind", "") if props else ""
text = event.data.text if event.data else ""
print(f"\n[ORCH:{kind}]\n\n{text}\n{'-' * 26}")
elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id
if last_stream_agent_id != agent_id or not stream_line_open:
if stream_line_open:
print()
print(f"\n[STREAM:{agent_id}]: ", end="", flush=True)
last_stream_agent_id = agent_id
stream_line_open = True
if event.data and event.data.text:
print(event.data.text, end="", flush=True)
elif event.data and event.data.text:
print(event.data.text, end="", flush=True)
elif isinstance(event, WorkflowOutputEvent):
output = str(event.data) if event.data is not None else None
output_messages = cast(list[ChatMessage], event.data)
if output_messages:
output = output_messages[-1].text
if stream_line_open:
print()
@@ -233,8 +233,8 @@ async def run_agent_framework_example(task: str) -> str:
workflow = (
GroupChatBuilder()
.set_prompt_based_manager(
chat_client=AzureOpenAIChatClient(credential=credential),
.set_manager(
manager=AzureOpenAIChatClient(credential=credential).create_agent(),
display_name="Coordinator",
)
.participants(researcher=researcher, planner=planner)
@@ -245,7 +245,12 @@ async def run_agent_framework_example(task: str) -> str:
async for event in workflow.run_stream(task):
if isinstance(event, WorkflowOutputEvent):
data = event.data
final_response = data.text or "" if isinstance(data, ChatMessage) else str(data)
if isinstance(data, list) and len(data) > 0:
# Get the final message from the conversation
final_message = data[-1]
final_response = final_message.text or "" if isinstance(final_message, ChatMessage) else str(data)
else:
final_response = str(data)
return final_response
+15 -14
View File
@@ -280,7 +280,7 @@ dependencies = [
[package.metadata]
requires-dist = [
{ name = "agent-framework-core", editable = "packages/core" },
{ name = "openai-chatkit", specifier = ">=1.1.0,<2.0.0" },
{ name = "openai-chatkit", specifier = ">=1.4.0,<2.0.0" },
]
[[package]]
@@ -798,7 +798,7 @@ wheels = [
[[package]]
name = "anthropic"
version = "0.74.1"
version = "0.75.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "anyio", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
@@ -810,9 +810,9 @@ dependencies = [
{ name = "sniffio", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "typing-extensions", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/d7/7b/609eea5c54ae69b1a4a94169d4b0c86dc5c41b43509989913f6cdc61b81d/anthropic-0.74.1.tar.gz", hash = "sha256:04c087b2751385c524f6d332d066a913870e4de8b3e335fb0a0c595f1f88dc6e", size = 428981, upload-time = "2025-11-19T22:17:31.533Z" }
sdist = { url = "https://files.pythonhosted.org/packages/04/1f/08e95f4b7e2d35205ae5dcbb4ae97e7d477fc521c275c02609e2931ece2d/anthropic-0.75.0.tar.gz", hash = "sha256:e8607422f4ab616db2ea5baacc215dd5f028da99ce2f022e33c7c535b29f3dfb", size = 439565, upload-time = "2025-11-24T20:41:45.28Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/dd/45/6b18d0692302b8cbc01a10c35b43953d3c4172fbd4f83337b8ed21a8eaa4/anthropic-0.74.1-py3-none-any.whl", hash = "sha256:b07b998d1cee7f41d9f02530597d7411672b362cc2417760a40c0167b81c6e65", size = 371473, upload-time = "2025-11-19T22:17:29.998Z" },
{ url = "https://files.pythonhosted.org/packages/60/1c/1cd02b7ae64302a6e06724bf80a96401d5313708651d277b1458504a1730/anthropic-0.75.0-py3-none-any.whl", hash = "sha256:ea8317271b6c15d80225a9f3c670152746e88805a7a61e14d4a374577164965b", size = 388164, upload-time = "2025-11-24T20:41:43.587Z" },
]
[[package]]
@@ -1799,7 +1799,7 @@ name = "exceptiongroup"
version = "1.3.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "typing-extensions", marker = "(python_full_version < '3.13' and sys_platform == 'darwin') or (python_full_version < '3.13' and sys_platform == 'linux') or (python_full_version < '3.13' and sys_platform == 'win32')" },
{ name = "typing-extensions", marker = "(python_full_version < '3.11' and sys_platform == 'darwin') or (python_full_version < '3.11' and sys_platform == 'linux') or (python_full_version < '3.11' and sys_platform == 'win32')" },
]
sdist = { url = "https://files.pythonhosted.org/packages/50/79/66800aadf48771f6b62f7eb014e352e5d06856655206165d775e675a02c9/exceptiongroup-1.3.1.tar.gz", hash = "sha256:8b412432c6055b0b7d14c310000ae93352ed6754f70fa8f7c34141f91c4e3219", size = 30371, upload-time = "2025-11-21T23:01:54.787Z" }
wheels = [
@@ -1817,7 +1817,7 @@ wheels = [
[[package]]
name = "fastapi"
version = "0.121.3"
version = "0.122.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "annotated-doc", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
@@ -1825,9 +1825,9 @@ dependencies = [
{ name = "starlette", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "typing-extensions", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/80/f0/086c442c6516195786131b8ca70488c6ef11d2f2e33c9a893576b2b0d3f7/fastapi-0.121.3.tar.gz", hash = "sha256:0055bc24fe53e56a40e9e0ad1ae2baa81622c406e548e501e717634e2dfbc40b", size = 344501, upload-time = "2025-11-19T16:53:39.243Z" }
sdist = { url = "https://files.pythonhosted.org/packages/b2/de/3ee97a4f6ffef1fb70bf20561e4f88531633bb5045dc6cebc0f8471f764d/fastapi-0.122.0.tar.gz", hash = "sha256:cd9b5352031f93773228af8b4c443eedc2ac2aa74b27780387b853c3726fb94b", size = 346436, upload-time = "2025-11-24T19:17:47.95Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/98/b6/4f620d7720fc0a754c8c1b7501d73777f6ba43b57c8ab99671f4d7441eb8/fastapi-0.121.3-py3-none-any.whl", hash = "sha256:0c78fc87587fcd910ca1bbf5bc8ba37b80e119b388a7206b39f0ecc95ebf53e9", size = 109801, upload-time = "2025-11-19T16:53:37.918Z" },
{ url = "https://files.pythonhosted.org/packages/7a/93/aa8072af4ff37b795f6bbf43dcaf61115f40f49935c7dbb180c9afc3f421/fastapi-0.122.0-py3-none-any.whl", hash = "sha256:a456e8915dfc6c8914a50d9651133bd47ec96d331c5b44600baa635538a30d67", size = 110671, upload-time = "2025-11-24T19:17:45.96Z" },
]
[[package]]
@@ -3730,17 +3730,18 @@ wheels = [
[[package]]
name = "openai-chatkit"
version = "1.3.1"
version = "1.4.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "jinja2", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "openai", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "openai-agents", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "pydantic", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "uvicorn", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/f7/81/906844005976afb3c415e27f6506ed0f2b8b1040f27b4bc9ef118a256986/openai_chatkit-1.3.1.tar.gz", hash = "sha256:91f39b04584f969642a6c3b4099fdad74c2a357e25d8f746f9709046304a06cf", size = 50730, upload-time = "2025-11-21T21:22:11.62Z" }
sdist = { url = "https://files.pythonhosted.org/packages/c5/89/bf2f094997c8e5cad5334e8a02e05fc458823e65fb2675f45b56b6d1ab73/openai_chatkit-1.4.0.tar.gz", hash = "sha256:e2527dffc3794a05596ad75efa66bdc4efb4ded5a77a013a55496cc989bcf2e6", size = 55269, upload-time = "2025-11-25T21:02:58.503Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/fb/61/235e5f49bd068bbe5f7278bc7d7c4bd92226858698fcac53ab92090baf04/openai_chatkit-1.3.1-py3-none-any.whl", hash = "sha256:5626492e5752879e66b2b6d4fbac51994407d46429de99b91515a77c2e0c6148", size = 35899, upload-time = "2025-11-21T21:22:10.37Z" },
{ url = "https://files.pythonhosted.org/packages/90/bf/68d42561dd8a674b6f8541d879dd165b5ac4d81fcf1027462e154de66a4f/openai_chatkit-1.4.0-py3-none-any.whl", hash = "sha256:35d00ca8398908bd70d63e2284adcd836641cc11746f68d7cfa91d276e3dad3d", size = 39077, upload-time = "2025-11-25T21:02:57.288Z" },
]
[[package]]
@@ -5075,7 +5076,7 @@ wheels = [
[[package]]
name = "qdrant-client"
version = "1.16.0"
version = "1.16.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "grpcio", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
@@ -5087,9 +5088,9 @@ dependencies = [
{ name = "pydantic", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "urllib3", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/fa/16/366541897d270ee3f9c3f87da145baa8a5c9cc5190e0e53e8bbec1267cff/qdrant_client-1.16.0.tar.gz", hash = "sha256:0716aa0b7cca39745829c2e8ea0beb275fe2990e743ad803eabd6218e4b35c1b", size = 284128, upload-time = "2025-11-17T13:19:52.726Z" }
sdist = { url = "https://files.pythonhosted.org/packages/d9/68/fec3816a223c0b73b0e0036460be45c61ce2770ffb9197ac371e4f615ddc/qdrant_client-1.16.1.tar.gz", hash = "sha256:676c7c10fd4d4cb2981b8fcb32fd764f5f661b04b7334d024034d07212f971fd", size = 332130, upload-time = "2025-11-25T04:31:54.212Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/a5/ff/3a69bb56835c4b2e9fa780655790937011ac389b0408b9a1147eaa2cee22/qdrant_client-1.16.0-py3-none-any.whl", hash = "sha256:6b932393e84e4c0233e5b2eb96b0918e968725855adae4d9c541761f4c50cf11", size = 328579, upload-time = "2025-11-17T13:19:51.092Z" },
{ url = "https://files.pythonhosted.org/packages/60/e2/60a20d04b0595c641516463168909c5bbcc192d3d6eacb637c1677109c6a/qdrant_client-1.16.1-py3-none-any.whl", hash = "sha256:1eefe89f66e8a468ba0de1680e28b441e69825cfb62e8fb2e457c15e24ce5e3b", size = 378481, upload-time = "2025-11-25T04:31:52.629Z" },
]
[[package]]