Python: Add support for the MagenticWorkflowBuilder (#496)

* magentic happy path - wip

* Support workflow high-level magentic builder API. Add tests and samples.

* Add sample docstring

* Addressing PR feedback round 1

* Fix mypy errors

* Callback improvements

* Rename to MagenticBuilder

* Improvements

* Emit function calling deltas

* PR feedback 2

* Clean up sample
This commit is contained in:
Evan Mattson
2025-08-28 14:39:48 +09:00
committed by GitHub
Unverified
parent 738866f4fe
commit 529341f58b
8 changed files with 2716 additions and 5 deletions
@@ -22,6 +22,13 @@ _IMPORTS = [
"AgentRunStreamingEvent",
"handler",
"AgentExecutor",
"MagenticAgentDeltaEvent",
"MagenticCallbackEvent",
"MagenticCallbackMode",
"MagenticAgentMessageEvent",
"MagenticFinalResultEvent",
"MagenticManagerBase",
"MagenticOrchestratorMessageEvent",
"AgentExecutorRequest",
"AgentExecutorResponse",
"RequestInfoExecutor",
@@ -40,6 +47,12 @@ _IMPORTS = [
"SubWorkflowResponse",
"WorkflowExecutor",
"intercepts_request",
"MagenticBuilder",
"PlanReviewDecision",
"PlanReviewReply",
"PlanReviewRequest",
"RequestInfoEvent",
"StandardMagenticManager",
]
@@ -15,10 +15,19 @@ from agent_framework_workflow import (
ExecutorInvokeEvent,
FileCheckpointStorage,
InMemoryCheckpointStorage,
MagenticBuilder,
MagenticCallbackEvent,
MagenticCallbackMode,
MagenticPlanReviewDecision,
MagenticPlanReviewReply,
MagenticPlanReviewRequest,
MagenticProgressLedger,
MagenticProgressLedgerItem,
RequestInfoEvent,
RequestInfoExecutor,
RequestInfoMessage,
RequestResponse,
StandardMagenticManager,
SubWorkflowRequestInfo,
SubWorkflowResponse,
Workflow,
@@ -51,10 +60,20 @@ __all__ = [
"ExecutorInvokeEvent",
"FileCheckpointStorage",
"InMemoryCheckpointStorage",
"MagenticBuilder",
"MagenticCallbackEvent",
"MagenticCallbackMode",
"MagenticPlanReviewDecision",
"MagenticPlanReviewReply",
"MagenticPlanReviewRequest",
"MagenticProgressLedger",
"MagenticProgressLedgerItem",
"RequestInfoEvent",
"RequestInfoEvent",
"RequestInfoExecutor",
"RequestInfoMessage",
"RequestResponse",
"StandardMagenticManager",
"SubWorkflowRequestInfo",
"SubWorkflowResponse",
"Workflow",
@@ -37,6 +37,28 @@ from ._executor import (
handler,
intercepts_request,
)
from ._magentic import (
MagenticAgentDeltaEvent,
MagenticAgentExecutor,
MagenticAgentMessageEvent,
MagenticBuilder,
MagenticCallbackEvent,
MagenticCallbackMode,
MagenticContext,
MagenticFinalResultEvent,
MagenticManagerBase,
MagenticOrchestratorExecutor,
MagenticOrchestratorMessageEvent,
MagenticPlanReviewDecision,
MagenticPlanReviewReply,
MagenticPlanReviewRequest,
MagenticProgressLedger,
MagenticProgressLedgerItem,
MagenticRequestMessage,
MagenticResponseMessage,
MagenticStartMessage,
StandardMagenticManager,
)
from ._runner_context import (
InProcRunnerContext,
Message,
@@ -79,12 +101,32 @@ __all__ = [
"GraphConnectivityError",
"InMemoryCheckpointStorage",
"InProcRunnerContext",
"MagenticAgentDeltaEvent",
"MagenticAgentExecutor",
"MagenticAgentMessageEvent",
"MagenticBuilder",
"MagenticCallbackEvent",
"MagenticCallbackMode",
"MagenticContext",
"MagenticFinalResultEvent",
"MagenticManagerBase",
"MagenticOrchestratorExecutor",
"MagenticOrchestratorMessageEvent",
"MagenticPlanReviewDecision",
"MagenticPlanReviewReply",
"MagenticPlanReviewRequest",
"MagenticProgressLedger",
"MagenticProgressLedgerItem",
"MagenticRequestMessage",
"MagenticResponseMessage",
"MagenticStartMessage",
"Message",
"RequestInfoEvent",
"RequestInfoExecutor",
"RequestInfoMessage",
"RequestResponse",
"RunnerContext",
"StandardMagenticManager",
"SubWorkflowRequestInfo",
"SubWorkflowResponse",
"TypeCompatibilityError",
@@ -206,7 +206,7 @@ class Executor(AFBaseModel):
if isinstance(response, RequestResponse):
# Add automatic correlation info to the response
correlated_response = RequestResponse[RequestInfoMessage, Any].with_correlation(
response, # pyright: ignore[reportUnknownArgumentType]
response,
request.data,
request.request_id,
)
@@ -431,9 +431,9 @@ class RequestResponse(Generic[TRequest, TResponse]):
original_request: TRequest,
request_id: str,
) -> "RequestResponse[TRequest, TResponse]":
"""Internal method to add correlation info to a response.
"""Add correlation info to a response.
This is called automatically by the framework and should not be used directly.
This is called automatically by the framework when processing intercepted requests.
"""
return RequestResponse(
is_handled=original_response.is_handled,
@@ -661,8 +661,8 @@ class RequestInfoExecutor(Executor):
event = RequestInfoEvent(
request_id=message.request_id, # Use original request ID
source_executor_id=source_executor_id,
request_type=type(message.data), # SubWorkflowRequestInfo type
request_data=message.data, # The full SubWorkflowRequestInfo
request_type=type(message.data), # Type of the wrapped data # type: ignore
request_data=message.data, # The wrapped request data
)
self._request_events[message.request_id] = event
await ctx.add_event(event)
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,451 @@
# Copyright (c) Microsoft. All rights reserved.
from collections.abc import AsyncIterable
from dataclasses import dataclass
from typing import Any
import pytest
from agent_framework import (
AgentRunResponse,
AgentRunResponseUpdate,
ChatMessage,
ChatResponse,
ChatResponseUpdate,
ChatRole,
TextContent,
)
from agent_framework._agents import AgentBase
from agent_framework._clients import ChatClient as AFChatClient
from agent_framework_workflow import (
Executor,
MagenticBuilder,
MagenticManagerBase,
MagenticPlanReviewDecision,
MagenticPlanReviewReply,
MagenticPlanReviewRequest,
MagenticProgressLedger,
MagenticProgressLedgerItem,
RequestInfoEvent,
WorkflowCompletedEvent,
WorkflowContext,
WorkflowEvent, # type: ignore # noqa: E402
handler,
)
from agent_framework_workflow._magentic import (
MagenticContext,
MagenticStartMessage,
)
def test_magentic_start_message_from_string():
msg = MagenticStartMessage.from_string("Do the thing")
assert isinstance(msg, MagenticStartMessage)
assert isinstance(msg.task, ChatMessage)
assert msg.task.role == ChatRole.USER
assert msg.task.text == "Do the thing"
def test_plan_review_request_defaults_and_reply_variants():
req = MagenticPlanReviewRequest() # defaults provided by dataclass
assert hasattr(req, "request_id")
assert req.task_text == "" and req.facts_text == "" and req.plan_text == ""
assert isinstance(req.round_index, int) and req.round_index == 0
# Replies: approve, revise with comments, revise with edited text
approve = MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE)
revise_comments = MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.REVISE, comments="Tighten scope")
revise_text = MagenticPlanReviewReply(
decision=MagenticPlanReviewDecision.REVISE,
edited_plan_text="- Step 1\n- Step 2",
)
assert approve.decision == MagenticPlanReviewDecision.APPROVE
assert revise_comments.comments == "Tighten scope"
assert revise_text.edited_plan_text is not None and revise_text.edited_plan_text.startswith("- Step 1")
def test_magentic_context_reset_behavior():
ctx = MagenticContext(
task=ChatMessage(role=ChatRole.USER, text="task"),
participant_descriptions={"Alice": "Researcher"},
)
# seed context state
ctx.chat_history.append(ChatMessage(role=ChatRole.ASSISTANT, text="draft"))
ctx.stall_count = 2
prev_reset = ctx.reset_count
ctx.reset()
assert ctx.chat_history == []
assert ctx.stall_count == 0
assert ctx.reset_count == prev_reset + 1
@dataclass
class _SimpleLedger:
facts: ChatMessage
plan: ChatMessage
class FakeManager(MagenticManagerBase):
"""Deterministic manager for tests that avoids real LLM calls."""
task_ledger: _SimpleLedger | None = None
satisfied_after_signoff: bool = True
next_speaker_name: str = "agentA"
instruction_text: str = "Proceed with step 1"
async def plan(self, magentic_context: MagenticContext) -> ChatMessage:
facts = ChatMessage(role=ChatRole.ASSISTANT, text="GIVEN OR VERIFIED FACTS\n- A\n")
plan = ChatMessage(role=ChatRole.ASSISTANT, text="- Do X\n- Do Y\n")
self.task_ledger = _SimpleLedger(facts=facts, plan=plan)
combined = f"Task: {magentic_context.task.text}\n\nFacts:\n{facts.text}\n\nPlan:\n{plan.text}"
return ChatMessage(role=ChatRole.ASSISTANT, text=combined, author_name="magentic_manager")
async def replan(self, magentic_context: MagenticContext) -> ChatMessage:
facts = ChatMessage(role=ChatRole.ASSISTANT, text="GIVEN OR VERIFIED FACTS\n- A2\n")
plan = ChatMessage(role=ChatRole.ASSISTANT, text="- Do Z\n")
self.task_ledger = _SimpleLedger(facts=facts, plan=plan)
combined = f"Task: {magentic_context.task.text}\n\nFacts:\n{facts.text}\n\nPlan:\n{plan.text}"
return ChatMessage(role=ChatRole.ASSISTANT, text=combined, author_name="magentic_manager")
async def create_progress_ledger(self, magentic_context: MagenticContext) -> MagenticProgressLedger:
is_satisfied = self.satisfied_after_signoff and len(magentic_context.chat_history) > 0
return MagenticProgressLedger(
is_request_satisfied=MagenticProgressLedgerItem(reason="test", answer=is_satisfied),
is_in_loop=MagenticProgressLedgerItem(reason="test", answer=False),
is_progress_being_made=MagenticProgressLedgerItem(reason="test", answer=True),
next_speaker=MagenticProgressLedgerItem(reason="test", answer=self.next_speaker_name),
instruction_or_question=MagenticProgressLedgerItem(reason="test", answer=self.instruction_text),
)
async def prepare_final_answer(self, magentic_context: MagenticContext) -> ChatMessage:
return ChatMessage(role=ChatRole.ASSISTANT, text="FINAL", author_name="magentic_manager")
async def test_standard_manager_plan_and_replan_combined_ledger():
manager = FakeManager(max_round_count=10, max_stall_count=3, max_reset_count=2)
ctx = MagenticContext(
task=ChatMessage(role=ChatRole.USER, text="demo task"),
participant_descriptions={"agentA": "Agent A"},
)
first = await manager.plan(ctx.model_copy(deep=True))
assert first.role == ChatRole.ASSISTANT and "Facts:" in first.text and "Plan:" in first.text
assert manager.task_ledger is not None
replanned = await manager.replan(ctx.model_copy(deep=True))
assert "A2" in replanned.text or "Do Z" in replanned.text
async def test_standard_manager_progress_ledger_and_fallback():
manager = FakeManager(max_round_count=10)
ctx = MagenticContext(
task=ChatMessage(role=ChatRole.USER, text="demo"),
participant_descriptions={"agentA": "Agent A"},
)
ledger = await manager.create_progress_ledger(ctx.model_copy(deep=True))
assert isinstance(ledger, MagenticProgressLedger)
assert ledger.next_speaker.answer == "agentA"
manager.satisfied_after_signoff = False
ledger2 = await manager.create_progress_ledger(ctx.model_copy(deep=True))
assert ledger2.is_request_satisfied.answer is False
async def test_magentic_workflow_plan_review_approval_to_completion():
manager = FakeManager(max_round_count=10)
wf = (
MagenticBuilder()
.participants(agentA=_DummyExec("agentA"))
.with_standard_manager(manager)
.with_plan_review()
.build()
)
req_event: RequestInfoEvent | None = None
async for ev in wf.run_streaming("do work"):
if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticPlanReviewRequest:
req_event = ev
assert req_event is not None
completed: WorkflowCompletedEvent | None = None
async for ev in wf.send_responses_streaming({
req_event.request_id: MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE)
}):
if isinstance(ev, WorkflowCompletedEvent):
completed = ev
break
assert completed is not None
assert isinstance(getattr(completed, "data", None), ChatMessage)
async def test_magentic_plan_review_approve_with_comments_replans_and_proceeds():
class CountingManager(FakeManager):
# Declare as a model field so assignment is allowed under Pydantic
replan_count: int = 0
def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def]
super().__init__(*args, **kwargs)
async def replan(self, magentic_context: MagenticContext) -> ChatMessage: # type: ignore[override]
self.replan_count += 1
return await super().replan(magentic_context)
manager = CountingManager(max_round_count=10)
wf = (
MagenticBuilder()
.participants(agentA=_DummyExec("agentA"))
.with_standard_manager(manager)
.with_plan_review()
.build()
)
# Wait for the initial plan review request
req_event: RequestInfoEvent | None = None
async for ev in wf.run_streaming("do work"):
if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticPlanReviewRequest:
req_event = ev
assert req_event is not None
# Reply APPROVE with comments (no edited text). Expect one replan and no second review round.
saw_second_review = False
completed: WorkflowCompletedEvent | None = None
async for ev in wf.send_responses_streaming({
req_event.request_id: MagenticPlanReviewReply(
decision=MagenticPlanReviewDecision.APPROVE,
comments="Looks good; consider Z",
)
}):
if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticPlanReviewRequest:
saw_second_review = True
if isinstance(ev, WorkflowCompletedEvent):
completed = ev
break
assert completed is not None
assert manager.replan_count >= 1
assert saw_second_review is False
# Replan from FakeManager updates facts/plan to include A2 / Do Z
assert manager.task_ledger is not None
combined_text = (manager.task_ledger.facts.text or "") + (manager.task_ledger.plan.text or "")
assert ("A2" in combined_text) or ("Do Z" in combined_text)
async def test_magentic_orchestrator_round_limit_produces_partial_result():
manager = FakeManager(max_round_count=1)
manager.satisfied_after_signoff = False
wf = MagenticBuilder().participants(agentA=_DummyExec("agentA")).with_standard_manager(manager).build()
from agent_framework_workflow import WorkflowEvent # type: ignore
events: list[WorkflowEvent] = []
async for ev in wf.run_streaming("round limit test"):
events.append(ev)
if len(events) > 50:
break
completed = next((e for e in events if isinstance(e, WorkflowCompletedEvent)), None)
assert completed is not None
data = getattr(completed, "data", None)
assert isinstance(data, ChatMessage)
assert data.role == ChatRole.ASSISTANT
class _DummyExec(Executor):
def __init__(self, name: str) -> None:
super().__init__(name)
@handler
async def _noop(self, message: object, ctx: WorkflowContext[object]) -> None: # pragma: no cover - not called
pass
from agent_framework_workflow import StandardMagenticManager # noqa: E402
class _StubChatClient(AFChatClient):
async def get_response(self, messages, **kwargs): # type: ignore[override]
return ChatResponse(messages=[ChatMessage(role=ChatRole.ASSISTANT, text="ok")])
def get_streaming_response(self, messages, **kwargs) -> AsyncIterable[ChatResponseUpdate]: # type: ignore[override]
async def _gen():
if False:
yield ChatResponseUpdate() # pragma: no cover
return _gen()
async def test_standard_manager_plan_and_replan_via_complete_monkeypatch():
mgr = StandardMagenticManager(chat_client=_StubChatClient())
async def fake_complete_plan(messages: list[ChatMessage], **kwargs: Any) -> ChatMessage:
# Return a different response depending on call order length
if any("FACTS" in (m.text or "") for m in messages):
return ChatMessage(role=ChatRole.ASSISTANT, text="- step A\n- step B")
return ChatMessage(role=ChatRole.ASSISTANT, text="GIVEN OR VERIFIED FACTS\n- fact1")
# First, patch to produce facts then plan
mgr._complete = fake_complete_plan # type: ignore[attr-defined]
ctx = MagenticContext(
task=ChatMessage(role=ChatRole.USER, text="T"),
participant_descriptions={"A": "desc"},
)
combined = await mgr.plan(ctx.model_copy(deep=True))
# Assert structural headings and that steps appear in the combined ledger output.
assert "We are working to address the following user request:" in combined.text
assert "Here is the plan to follow as best as possible:" in combined.text
assert any(t in combined.text for t in ("- step A", "- step B", "- step"))
# Now replan with new outputs
async def fake_complete_replan(messages: list[ChatMessage], **kwargs: Any) -> ChatMessage:
if any("Please briefly explain" in (m.text or "") for m in messages):
return ChatMessage(role=ChatRole.ASSISTANT, text="- new step")
return ChatMessage(role=ChatRole.ASSISTANT, text="GIVEN OR VERIFIED FACTS\n- updated")
mgr._complete = fake_complete_replan # type: ignore[attr-defined]
combined2 = await mgr.replan(ctx.model_copy(deep=True))
assert "updated" in combined2.text or "new step" in combined2.text
async def test_standard_manager_progress_ledger_success_and_error():
mgr = StandardMagenticManager(chat_client=_StubChatClient())
ctx = MagenticContext(
task=ChatMessage(role=ChatRole.USER, text="task"),
participant_descriptions={"alice": "desc"},
)
# Success path: valid JSON
async def fake_complete_ok(messages: list[ChatMessage], **kwargs: Any) -> ChatMessage:
json_text = (
'{"is_request_satisfied": {"reason": "r", "answer": false}, '
'"is_in_loop": {"reason": "r", "answer": false}, '
'"is_progress_being_made": {"reason": "r", "answer": true}, '
'"next_speaker": {"reason": "r", "answer": "alice"}, '
'"instruction_or_question": {"reason": "r", "answer": "do"}}'
)
return ChatMessage(role=ChatRole.ASSISTANT, text=json_text)
mgr._complete = fake_complete_ok # type: ignore[attr-defined]
ledger = await mgr.create_progress_ledger(ctx.model_copy(deep=True))
assert ledger.next_speaker.answer == "alice"
# Error path: invalid JSON now raises to avoid emitting planner-oriented instructions to agents
async def fake_complete_bad(messages: list[ChatMessage], **kwargs: Any) -> ChatMessage:
return ChatMessage(role=ChatRole.ASSISTANT, text="not-json")
mgr._complete = fake_complete_bad # type: ignore[attr-defined]
with pytest.raises(RuntimeError):
await mgr.create_progress_ledger(ctx.model_copy(deep=True))
class InvokeOnceManager(MagenticManagerBase):
def __init__(self) -> None:
super().__init__(max_round_count=5, max_stall_count=3, max_reset_count=2)
self._invoked = False
async def plan(self, magentic_context: MagenticContext) -> ChatMessage:
return ChatMessage(role=ChatRole.ASSISTANT, text="ledger")
async def replan(self, magentic_context: MagenticContext) -> ChatMessage:
return ChatMessage(role=ChatRole.ASSISTANT, text="re-ledger")
async def create_progress_ledger(self, magentic_context: MagenticContext) -> MagenticProgressLedger:
if not self._invoked:
# First round: ask agentA to respond
self._invoked = True
return MagenticProgressLedger(
is_request_satisfied=MagenticProgressLedgerItem(reason="r", answer=False),
is_in_loop=MagenticProgressLedgerItem(reason="r", answer=False),
is_progress_being_made=MagenticProgressLedgerItem(reason="r", answer=True),
next_speaker=MagenticProgressLedgerItem(reason="r", answer="agentA"),
instruction_or_question=MagenticProgressLedgerItem(reason="r", answer="say hi"),
)
# Next round: mark satisfied so run can conclude
return MagenticProgressLedger(
is_request_satisfied=MagenticProgressLedgerItem(reason="r", answer=True),
is_in_loop=MagenticProgressLedgerItem(reason="r", answer=False),
is_progress_being_made=MagenticProgressLedgerItem(reason="r", answer=True),
next_speaker=MagenticProgressLedgerItem(reason="r", answer="agentA"),
instruction_or_question=MagenticProgressLedgerItem(reason="r", answer="done"),
)
async def prepare_final_answer(self, magentic_context: MagenticContext) -> ChatMessage:
return ChatMessage(role=ChatRole.ASSISTANT, text="final")
class StubThreadAgent(AgentBase):
async def run_streaming(self, messages=None, *, thread=None, **kwargs): # type: ignore[override]
yield AgentRunResponseUpdate(
contents=[TextContent(text="thread-ok")],
author_name="agentA",
role=ChatRole.ASSISTANT,
)
async def run(self, messages=None, *, thread=None, **kwargs): # type: ignore[override]
return AgentRunResponse(messages=[ChatMessage(role=ChatRole.ASSISTANT, text="thread-ok", author_name="agentA")])
class StubAssistantsClient:
pass # class name used for branch detection
class StubAssistantsAgent(AgentBase):
chat_client: object | None = None # allow assignment via Pydantic field
def __init__(self) -> None:
super().__init__()
self.chat_client = StubAssistantsClient() # type name contains 'AssistantsClient'
async def run_streaming(self, messages=None, *, thread=None, **kwargs): # type: ignore[override]
yield AgentRunResponseUpdate(
contents=[TextContent(text="assistants-ok")],
author_name="agentA",
role=ChatRole.ASSISTANT,
)
async def run(self, messages=None, *, thread=None, **kwargs): # type: ignore[override]
return AgentRunResponse(
messages=[ChatMessage(role=ChatRole.ASSISTANT, text="assistants-ok", author_name="agentA")]
)
async def _collect_agent_responses_setup(participant_obj: object):
captured: list[ChatMessage] = []
async def sink(event) -> None: # type: ignore[no-untyped-def]
from agent_framework_workflow._magentic import MagenticAgentMessageEvent
if isinstance(event, MagenticAgentMessageEvent) and event.message is not None:
captured.append(event.message)
wf = (
MagenticBuilder()
.participants(agentA=participant_obj) # type: ignore[arg-type]
.with_standard_manager(InvokeOnceManager())
.on_event(sink) # type: ignore
.build()
)
# Run a bounded stream to allow one invoke and then completion
events: list[WorkflowEvent] = []
async for ev in wf.run_streaming("task"): # plan review disabled
events.append(ev)
if len(events) > 50:
break
return captured
async def test_agent_executor_invoke_with_thread_chat_client():
captured = await _collect_agent_responses_setup(StubThreadAgent())
# Should have at least one response from agentA via MagenticAgentExecutor path
assert any((m.author_name == "agentA" and "ok" in (m.text or "")) for m in captured)
async def test_agent_executor_invoke_with_assistants_client_messages():
captured = await _collect_agent_responses_setup(StubAssistantsAgent())
assert any((m.author_name == "agentA" and "ok" in (m.text or "")) for m in captured)
@@ -0,0 +1,148 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import logging
from agent_framework import ChatClientAgent, HostedCodeInterpreterTool
from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient
from agent_framework_workflow import (
MagenticAgentDeltaEvent,
MagenticAgentMessageEvent,
MagenticBuilder,
MagenticCallbackEvent,
MagenticCallbackMode,
MagenticFinalResultEvent,
MagenticOrchestratorMessageEvent,
WorkflowCompletedEvent,
)
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
"""
Magentic Workflow (multi-agent) sample.
This sample shows how to orchestrate multiple agents using the
MagenticBuilder:
- ResearcherAgent (ChatClientAgent backed by an OpenAI chat client) for
finding information.
- CoderAgent (ChatClientAgent backed by OpenAI Assistants with the hosted
code interpreter tool) for analysis and computation.
The workflow is configured with:
- A Standard Magentic manager (uses a chat client for planning and progress).
- Callbacks for final results, per-message agent responses, and streaming
token updates.
When run, the script builds the workflow, submits a task about estimating the
energy efficiency and CO2 emissions of several ML models, streams intermediate
events to the console, and prints the final aggregated answer at completion.
"""
async def main() -> None:
researcher_agent = ChatClientAgent(
name="ResearcherAgent",
description="Specialist in research and information gathering",
instructions=(
"You are a Researcher. You find information without additional computation or quantitative analysis."
),
# This agent requires the gpt-4o-search-preview model to perform web searches.
# Feel free to explore with other agents that support web search, for example,
# the `OpenAIResponseAgent` or `AzureAIAgent` with bing grounding.
chat_client=OpenAIChatClient(ai_model_id="gpt-4o-search-preview"),
)
coder_agent = ChatClientAgent(
name="CoderAgent",
description="A helpful assistant that writes and executes code to process and analyze data.",
instructions="You solve questions using code. Please provide detailed analysis and computation process.",
chat_client=OpenAIResponsesClient(),
tools=HostedCodeInterpreterTool(),
)
# Unified callback
async def on_event(event: MagenticCallbackEvent) -> None:
"""
The `on_event` callback processes events emitted by the workflow.
Events include: orchestrator messages, agent delta updates, agent messages, and final result events.
"""
nonlocal last_stream_agent_id, stream_line_open
if isinstance(event, MagenticOrchestratorMessageEvent):
print(f"\n[ORCH:{event.kind}]\n\n{getattr(event.message, 'text', '')}\n{'-' * 26}")
elif isinstance(event, MagenticAgentDeltaEvent):
if last_stream_agent_id != event.agent_id or not stream_line_open:
if stream_line_open:
print()
print(f"\n[STREAM:{event.agent_id}]: ", end="", flush=True)
last_stream_agent_id = event.agent_id
stream_line_open = True
print(event.text, end="", flush=True)
elif isinstance(event, MagenticAgentMessageEvent):
if stream_line_open:
print(" (final)")
stream_line_open = False
print()
msg = event.message
if msg is not None:
response_text = (msg.text or "").replace("\n", " ")
print(f"\n[AGENT:{event.agent_id}] {msg.role.value}\n\n{response_text}\n{'-' * 26}")
elif isinstance(event, MagenticFinalResultEvent):
print("\n" + "=" * 50)
print("FINAL RESULT:")
print("=" * 50)
if event.message is not None:
print(event.message.text)
print("=" * 50)
print("\nBuilding Magentic Workflow...")
# State used by on_agent_stream callback
last_stream_agent_id: str | None = None
stream_line_open: bool = False
workflow = (
MagenticBuilder()
.participants(researcher=researcher_agent, coder=coder_agent)
.on_event(on_event, mode=MagenticCallbackMode.STREAMING)
.with_standard_manager(
chat_client=OpenAIChatClient(),
max_round_count=10,
max_stall_count=3,
max_reset_count=2,
)
.build()
)
task = (
"I am preparing a report on the energy efficiency of different machine learning model architectures. "
"Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 "
"on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). "
"Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 "
"VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model "
"per task type (image classification, text classification, and text generation)."
)
print(f"\nTask: {task}")
print("\nStarting workflow execution...")
try:
completion_event = None
async for event in workflow.run_streaming(task):
print(f"Event: {event}")
if isinstance(event, WorkflowCompletedEvent):
completion_event = event
if completion_event is not None:
data = getattr(completion_event, "data", None)
preview = getattr(data, "text", None) or (str(data) if data is not None else "")
print(f"Workflow completed with result:\n\n{preview}")
except Exception as e:
print(f"Workflow execution failed: {e}")
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,189 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import logging
from typing import cast
from agent_framework import ChatClientAgent, HostedCodeInterpreterTool
from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient
from agent_framework_workflow import (
MagenticAgentDeltaEvent,
MagenticAgentMessageEvent,
MagenticBuilder,
MagenticCallbackEvent,
MagenticCallbackMode,
MagenticFinalResultEvent,
MagenticOrchestratorMessageEvent,
MagenticPlanReviewDecision,
MagenticPlanReviewReply,
MagenticPlanReviewRequest,
RequestInfoEvent,
WorkflowCompletedEvent,
)
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
"""
Magentic workflow with human-in-the-loop plan review and update.
This sample builds a Magentic workflow with two cooperating agents and enables
plan review so a human can approve or revise the plan before execution:
- researcher: ChatClientAgent backed by OpenAIChatClient (web/search-capable model)
- coder: ChatClientAgent backed by OpenAIAssistantsClient with the Hosted Code Interpreter tool
Key behaviors demonstrated:
- with_plan_review(): requests a PlanReviewRequest before coordination begins
- Event loop that waits for RequestInfoEvent[PlanReviewRequest], prints the plan, then
replies with PlanReviewReply (here we auto-approve, but you can edit/collect input)
- Callbacks: on_agent_stream (incremental chunks), on_agent_response (final messages),
on_result (final answer), and on_exception
Prereqs: configure your OpenAI credentials in the environment so the Chat/Assistants
clients can run. You can swap clients/models as needed.
"""
async def main() -> None:
researcher_agent = ChatClientAgent(
name="ResearcherAgent",
description="Specialist in research and information gathering",
instructions=(
"You are a Researcher. You find information without additional computation or quantitative analysis."
),
# This agent requires the gpt-4o-search-preview model to perform web searches.
# Feel free to explore with other agents that support web search, for example,
# the `OpenAIResponseAgent` or `AzureAIAgent` with bing grounding.
chat_client=OpenAIChatClient(ai_model_id="gpt-4o-search-preview"),
)
coder_agent = ChatClientAgent(
name="CoderAgent",
description="A helpful assistant that writes and executes code to process and analyze data.",
instructions="You solve questions using code. Please provide detailed analysis and computation process.",
chat_client=OpenAIResponsesClient(),
tools=HostedCodeInterpreterTool(),
)
# Callbacks
def on_exception(exception: Exception) -> None:
print(f"Exception occurred: {exception}")
logger.exception("Workflow exception", exc_info=exception)
# Unified callback
async def on_event(event: MagenticCallbackEvent) -> None:
nonlocal last_stream_agent_id, stream_line_open
if isinstance(event, MagenticOrchestratorMessageEvent):
print(f"\n[ORCH:{event.kind}]\n\n{getattr(event.message, 'text', '')}\n{'-' * 26}")
elif isinstance(event, MagenticAgentDeltaEvent):
if last_stream_agent_id != event.agent_id or not stream_line_open:
if stream_line_open:
print()
print(f"\n[STREAM:{event.agent_id}]: ", end="", flush=True)
last_stream_agent_id = event.agent_id
stream_line_open = True
print(event.text, end="", flush=True)
elif isinstance(event, MagenticAgentMessageEvent):
if stream_line_open:
print(" (final)")
stream_line_open = False
print()
msg = event.message
if msg is not None:
response_text = (msg.text or "").replace("\n", " ")
print(f"\n[AGENT:{event.agent_id}] {msg.role.value}\n\n{response_text}\n{'-' * 26}")
elif isinstance(event, MagenticFinalResultEvent):
print("\n" + "=" * 50)
print("FINAL RESULT:")
print("=" * 50)
if event.message is not None:
print(event.message.text)
print("=" * 50)
print("\nBuilding Magentic Workflow...")
last_stream_agent_id: str | None = None
stream_line_open: bool = False
workflow = (
MagenticBuilder()
.participants(researcher=researcher_agent, coder=coder_agent)
.on_exception(on_exception)
.on_event(on_event, mode=MagenticCallbackMode.STREAMING)
.with_standard_manager(
chat_client=OpenAIChatClient(),
max_round_count=10,
max_stall_count=3,
max_reset_count=2,
)
.with_plan_review()
.build()
)
task = (
"I am preparing a report on the energy efficiency of different machine learning model architectures. "
"Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 "
"on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). "
"Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 "
"VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model "
"per task type (image classification, text classification, and text generation)."
)
print(f"\nTask: {task}")
print("\nStarting workflow execution...")
try:
completion_event: WorkflowCompletedEvent | None = None
pending_request: RequestInfoEvent | None = None
while True:
# Phase 1: run until either completion or a HIL request
if pending_request is None:
async for event in workflow.run_streaming(task):
print(f"Event: {event}")
if isinstance(event, WorkflowCompletedEvent):
completion_event = event
if isinstance(event, RequestInfoEvent) and event.request_type is MagenticPlanReviewRequest:
pending_request = event
review_req = cast(MagenticPlanReviewRequest, event.data)
if review_req.plan_text:
print(f"\n=== PLAN REVIEW REQUEST ===\n{review_req.plan_text}\n")
# Break if completed
if completion_event is not None:
data = getattr(completion_event, "data", None)
preview = getattr(data, "text", None) or (str(data) if data is not None else "")
print(f"Workflow completed with result:\n\n{preview}")
# Phase 2: respond to the pending plan review (HIL) request
if pending_request is not None:
# For demo purposes we approve as-is. Replace this with UI input
# to collect a human decision/comments/edited plan.
reply = MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE)
async for event in workflow.send_responses_streaming({pending_request.request_id: reply}):
print(f"Event: {event}")
if isinstance(event, WorkflowCompletedEvent):
completion_event = event
if isinstance(event, RequestInfoEvent) and event.request_type is MagenticPlanReviewRequest:
# Another review cycle requested; keep pending
pending_request = event
review_req = cast(MagenticPlanReviewRequest, event.data)
if review_req.plan_text:
print(f"\n=== PLAN REVIEW REQUEST ===\n{review_req.plan_text}\n")
else:
# Clear pending if no immediate new request
pending_request = None
except Exception as e:
print(f"Workflow execution failed: {e}")
on_exception(e)
if __name__ == "__main__":
asyncio.run(main())