From 529341f58b0dab4489dc24d609f08895e0ec4548 Mon Sep 17 00:00:00 2001 From: Evan Mattson <35585003+moonbox3@users.noreply.github.com> Date: Thu, 28 Aug 2025 14:39:48 +0900 Subject: [PATCH] Python: Add support for the MagenticWorkflowBuilder (#496) * magentic happy path - wip * Support workflow high-level magentic builder API. Add tests and samples. * Add sample docstring * Addressing PR feedback round 1 * Fix mypy errors * Callback improvements * Rename to MagenticBuilder * Improvements * Emit function calling deltas * PR feedback 2 * Clean up sample --- .../main/agent_framework/workflow/__init__.py | 13 + .../agent_framework/workflow/__init__.pyi | 19 + .../agent_framework_workflow/__init__.py | 42 + .../agent_framework_workflow/_executor.py | 10 +- .../agent_framework_workflow/_magentic.py | 1849 +++++++++++++++++ .../packages/workflow/tests/test_magentic.py | 451 ++++ .../workflow/step_08a_magentic_workflow.py | 148 ++ .../step_08b_magentic_human_plan_update.py | 189 ++ 8 files changed, 2716 insertions(+), 5 deletions(-) create mode 100644 python/packages/workflow/agent_framework_workflow/_magentic.py create mode 100644 python/packages/workflow/tests/test_magentic.py create mode 100644 python/samples/getting_started/workflow/step_08a_magentic_workflow.py create mode 100644 python/samples/getting_started/workflow/step_08b_magentic_human_plan_update.py diff --git a/python/packages/main/agent_framework/workflow/__init__.py b/python/packages/main/agent_framework/workflow/__init__.py index 2b848ea837..f74b1f1ff3 100644 --- a/python/packages/main/agent_framework/workflow/__init__.py +++ b/python/packages/main/agent_framework/workflow/__init__.py @@ -22,6 +22,13 @@ _IMPORTS = [ "AgentRunStreamingEvent", "handler", "AgentExecutor", + "MagenticAgentDeltaEvent", + "MagenticCallbackEvent", + "MagenticCallbackMode", + "MagenticAgentMessageEvent", + "MagenticFinalResultEvent", + "MagenticManagerBase", + "MagenticOrchestratorMessageEvent", "AgentExecutorRequest", "AgentExecutorResponse", "RequestInfoExecutor", @@ -40,6 +47,12 @@ _IMPORTS = [ "SubWorkflowResponse", "WorkflowExecutor", "intercepts_request", + "MagenticBuilder", + "PlanReviewDecision", + "PlanReviewReply", + "PlanReviewRequest", + "RequestInfoEvent", + "StandardMagenticManager", ] diff --git a/python/packages/main/agent_framework/workflow/__init__.pyi b/python/packages/main/agent_framework/workflow/__init__.pyi index 546783b3bc..5f0e2f7b21 100644 --- a/python/packages/main/agent_framework/workflow/__init__.pyi +++ b/python/packages/main/agent_framework/workflow/__init__.pyi @@ -15,10 +15,19 @@ from agent_framework_workflow import ( ExecutorInvokeEvent, FileCheckpointStorage, InMemoryCheckpointStorage, + MagenticBuilder, + MagenticCallbackEvent, + MagenticCallbackMode, + MagenticPlanReviewDecision, + MagenticPlanReviewReply, + MagenticPlanReviewRequest, + MagenticProgressLedger, + MagenticProgressLedgerItem, RequestInfoEvent, RequestInfoExecutor, RequestInfoMessage, RequestResponse, + StandardMagenticManager, SubWorkflowRequestInfo, SubWorkflowResponse, Workflow, @@ -51,10 +60,20 @@ __all__ = [ "ExecutorInvokeEvent", "FileCheckpointStorage", "InMemoryCheckpointStorage", + "MagenticBuilder", + "MagenticCallbackEvent", + "MagenticCallbackMode", + "MagenticPlanReviewDecision", + "MagenticPlanReviewReply", + "MagenticPlanReviewRequest", + "MagenticProgressLedger", + "MagenticProgressLedgerItem", + "RequestInfoEvent", "RequestInfoEvent", "RequestInfoExecutor", "RequestInfoMessage", "RequestResponse", + "StandardMagenticManager", "SubWorkflowRequestInfo", "SubWorkflowResponse", "Workflow", diff --git a/python/packages/workflow/agent_framework_workflow/__init__.py b/python/packages/workflow/agent_framework_workflow/__init__.py index 12b8ce6e4c..0214193061 100644 --- a/python/packages/workflow/agent_framework_workflow/__init__.py +++ b/python/packages/workflow/agent_framework_workflow/__init__.py @@ -37,6 +37,28 @@ from ._executor import ( handler, intercepts_request, ) +from ._magentic import ( + MagenticAgentDeltaEvent, + MagenticAgentExecutor, + MagenticAgentMessageEvent, + MagenticBuilder, + MagenticCallbackEvent, + MagenticCallbackMode, + MagenticContext, + MagenticFinalResultEvent, + MagenticManagerBase, + MagenticOrchestratorExecutor, + MagenticOrchestratorMessageEvent, + MagenticPlanReviewDecision, + MagenticPlanReviewReply, + MagenticPlanReviewRequest, + MagenticProgressLedger, + MagenticProgressLedgerItem, + MagenticRequestMessage, + MagenticResponseMessage, + MagenticStartMessage, + StandardMagenticManager, +) from ._runner_context import ( InProcRunnerContext, Message, @@ -79,12 +101,32 @@ __all__ = [ "GraphConnectivityError", "InMemoryCheckpointStorage", "InProcRunnerContext", + "MagenticAgentDeltaEvent", + "MagenticAgentExecutor", + "MagenticAgentMessageEvent", + "MagenticBuilder", + "MagenticCallbackEvent", + "MagenticCallbackMode", + "MagenticContext", + "MagenticFinalResultEvent", + "MagenticManagerBase", + "MagenticOrchestratorExecutor", + "MagenticOrchestratorMessageEvent", + "MagenticPlanReviewDecision", + "MagenticPlanReviewReply", + "MagenticPlanReviewRequest", + "MagenticProgressLedger", + "MagenticProgressLedgerItem", + "MagenticRequestMessage", + "MagenticResponseMessage", + "MagenticStartMessage", "Message", "RequestInfoEvent", "RequestInfoExecutor", "RequestInfoMessage", "RequestResponse", "RunnerContext", + "StandardMagenticManager", "SubWorkflowRequestInfo", "SubWorkflowResponse", "TypeCompatibilityError", diff --git a/python/packages/workflow/agent_framework_workflow/_executor.py b/python/packages/workflow/agent_framework_workflow/_executor.py index b466274ab1..19ccbe6224 100644 --- a/python/packages/workflow/agent_framework_workflow/_executor.py +++ b/python/packages/workflow/agent_framework_workflow/_executor.py @@ -206,7 +206,7 @@ class Executor(AFBaseModel): if isinstance(response, RequestResponse): # Add automatic correlation info to the response correlated_response = RequestResponse[RequestInfoMessage, Any].with_correlation( - response, # pyright: ignore[reportUnknownArgumentType] + response, request.data, request.request_id, ) @@ -431,9 +431,9 @@ class RequestResponse(Generic[TRequest, TResponse]): original_request: TRequest, request_id: str, ) -> "RequestResponse[TRequest, TResponse]": - """Internal method to add correlation info to a response. + """Add correlation info to a response. - This is called automatically by the framework and should not be used directly. + This is called automatically by the framework when processing intercepted requests. """ return RequestResponse( is_handled=original_response.is_handled, @@ -661,8 +661,8 @@ class RequestInfoExecutor(Executor): event = RequestInfoEvent( request_id=message.request_id, # Use original request ID source_executor_id=source_executor_id, - request_type=type(message.data), # SubWorkflowRequestInfo type - request_data=message.data, # The full SubWorkflowRequestInfo + request_type=type(message.data), # Type of the wrapped data # type: ignore + request_data=message.data, # The wrapped request data ) self._request_events[message.request_id] = event await ctx.add_event(event) diff --git a/python/packages/workflow/agent_framework_workflow/_magentic.py b/python/packages/workflow/agent_framework_workflow/_magentic.py new file mode 100644 index 0000000000..e0109d1642 --- /dev/null +++ b/python/packages/workflow/agent_framework_workflow/_magentic.py @@ -0,0 +1,1849 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +import contextlib +import json +import logging +import re +import sys +from abc import ABC, abstractmethod +from collections.abc import AsyncIterable, Awaitable, Callable +from dataclasses import dataclass +from enum import Enum +from typing import Annotated, Any, Literal, Protocol, TypeVar, Union, cast +from uuid import uuid4 + +from agent_framework import ( + AgentRunResponse, + AgentRunResponseUpdate, + AIAgent, + ChatClient, + ChatMessage, + ChatRole, + FunctionCallContent, + FunctionResultContent, +) +from agent_framework._agents import AgentBase +from agent_framework._pydantic import AFBaseModel +from pydantic import BaseModel, ConfigDict, Field + +from ._events import WorkflowCompletedEvent, WorkflowEvent +from ._executor import Executor, RequestInfoMessage, RequestResponse, handler +from ._workflow import Workflow, WorkflowBuilder, WorkflowRunResult +from ._workflow_context import WorkflowContext + +if sys.version_info >= (3, 11): + from typing import Self # pragma: no cover +else: + from typing_extensions import Self # pragma: no cover + +logger = logging.getLogger(__name__) + +# Consistent author name for messages produced by the Magentic manager/orchestrator +MAGENTIC_MANAGER_NAME = "magentic_manager" + +# Optional kinds for generic orchestrator message callback +ORCH_MSG_KIND_USER_TASK = "user_task" +ORCH_MSG_KIND_TASK_LEDGER = "task_ledger" +# Newly surfaced kinds for unified callback consumers +ORCH_MSG_KIND_INSTRUCTION = "instruction" +ORCH_MSG_KIND_NOTICE = "notice" + +# region Unified callback API (developer-facing) + + +class MagenticCallbackMode(str, Enum): + """Controls whether agent deltas are surfaced via on_event. + + STREAMING: emit AgentDeltaEvent chunks and a final AgentMessageEvent. + NON_STREAMING: suppress deltas and only emit AgentMessageEvent. + """ + + STREAMING = "streaming" + NON_STREAMING = "non_streaming" + + +@dataclass +class MagenticOrchestratorMessageEvent: + source: Literal["orchestrator"] = "orchestrator" + orchestrator_id: str = "" + message: ChatMessage | None = None + # Kind values include: user_task, task_ledger, instruction, notice + kind: str = "" + + +@dataclass +class MagenticAgentDeltaEvent: + source: Literal["agent"] = "agent" + agent_id: str | None = None + text: str | None = None + # Optional: function/tool streaming payloads + function_call_id: str | None = None + function_call_name: str | None = None + function_call_arguments: Any | None = None + function_result_id: str | None = None + function_result: Any | None = None + role: ChatRole | None = None + + +@dataclass +class MagenticAgentMessageEvent: + source: Literal["agent"] = "agent" + agent_id: str = "" + message: ChatMessage | None = None + + +@dataclass +class MagenticFinalResultEvent: + source: Literal["workflow"] = "workflow" + message: ChatMessage | None = None + + +MagenticCallbackEvent = Union[ + MagenticOrchestratorMessageEvent, + MagenticAgentDeltaEvent, + MagenticAgentMessageEvent, + MagenticFinalResultEvent, +] + + +class CallbackSink(Protocol): + async def __call__(self, event: MagenticCallbackEvent) -> None: ... + + +# endregion Unified callback API + +# region Magentic One Prompts + +ORCHESTRATOR_TASK_LEDGER_FACTS_PROMPT = """Below I will present you a request. + +Before we begin addressing the request, please answer the following pre-survey to the best of your ability. +Keep in mind that you are Ken Jennings-level with trivia, and Mensa-level with puzzles, so there should be +a deep well to draw from. + +Here is the request: + +{task} + +Here is the pre-survey: + + 1. Please list any specific facts or figures that are GIVEN in the request itself. It is possible that + there are none. + 2. Please list any facts that may need to be looked up, and WHERE SPECIFICALLY they might be found. + In some cases, authoritative sources are mentioned in the request itself. + 3. Please list any facts that may need to be derived (e.g., via logical deduction, simulation, or computation) + 4. Please list any facts that are recalled from memory, hunches, well-reasoned guesses, etc. + +When answering this survey, keep in mind that "facts" will typically be specific names, dates, statistics, etc. +Your answer should use headings: + + 1. GIVEN OR VERIFIED FACTS + 2. FACTS TO LOOK UP + 3. FACTS TO DERIVE + 4. EDUCATED GUESSES + +DO NOT include any other headings or sections in your response. DO NOT list next steps or plans until asked to do so. +""" + +ORCHESTRATOR_TASK_LEDGER_PLAN_PROMPT = """Fantastic. To address this request we have assembled the following team: + +{team} + +Based on the team composition, and known and unknown facts, please devise a short bullet-point plan for addressing the +original request. Remember, there is no requirement to involve all team members. A team member's particular expertise +may not be needed for this task. +""" + +# Added to render the ledger in a single assistant message, mirroring the original behavior. +ORCHESTRATOR_TASK_LEDGER_FULL_PROMPT = """ +We are working to address the following user request: + +{task} + + +To answer this request we have assembled the following team: + +{team} + + +Here is an initial fact sheet to consider: + +{facts} + + +Here is the plan to follow as best as possible: + +{plan} +""" + +ORCHESTRATOR_TASK_LEDGER_FACTS_UPDATE_PROMPT = """As a reminder, we are working to solve the following task: + +{task} + +It is clear we are not making as much progress as we would like, but we may have learned something new. +Please rewrite the following fact sheet, updating it to include anything new we have learned that may be helpful. + +Example edits can include (but are not limited to) adding new guesses, moving educated guesses to verified facts +if appropriate, etc. Updates may be made to any section of the fact sheet, and more than one section of the fact +sheet can be edited. This is an especially good time to update educated guesses, so please at least add or update +one educated guess or hunch, and explain your reasoning. + +Here is the old fact sheet: + +{old_facts} +""" + +ORCHESTRATOR_TASK_LEDGER_PLAN_UPDATE_PROMPT = """Please briefly explain what went wrong on this last run +(the root cause of the failure), and then come up with a new plan that takes steps and includes hints to overcome prior +challenges and especially avoids repeating the same mistakes. As before, the new plan should be concise, expressed in +bullet-point form, and consider the following team composition: + +{team} +""" + +ORCHESTRATOR_PROGRESS_LEDGER_PROMPT = """ +Recall we are working on the following request: + +{task} + +And we have assembled the following team: + +{team} + +To make progress on the request, please answer the following questions, including necessary reasoning: + + - Is the request fully satisfied? (True if complete, or False if the original request has yet to be + SUCCESSFULLY and FULLY addressed) + - Are we in a loop where we are repeating the same requests and or getting the same responses as before? + Loops can span multiple turns, and can include repeated actions like scrolling up or down more than a + handful of times. + - Are we making forward progress? (True if just starting, or recent messages are adding value. False if recent + messages show evidence of being stuck in a loop or if there is evidence of significant barriers to success + such as the inability to read from a required file) + - Who should speak next? (select from: {names}) + - What instruction or question would you give this team member? (Phrase as if speaking directly to them, and + include any specific information they may need) + +Please output an answer in pure JSON format according to the following schema. The JSON object must be parsable as-is. +DO NOT OUTPUT ANYTHING OTHER THAN JSON, AND DO NOT DEVIATE FROM THIS SCHEMA: + +{{ + "is_request_satisfied": {{ + + "reason": string, + "answer": boolean + }}, + "is_in_loop": {{ + "reason": string, + "answer": boolean + }}, + "is_progress_being_made": {{ + "reason": string, + "answer": boolean + }}, + "next_speaker": {{ + "reason": string, + "answer": string (select from: {names}) + }}, + "instruction_or_question": {{ + "reason": string, + "answer": string + }} +}} +""" + +ORCHESTRATOR_FINAL_ANSWER_PROMPT = """ +We are working on the following task: +{task} + +We have completed the task. + +The above messages contain the conversation that took place to complete the task. + +Based on the information gathered, provide the final answer to the original request. +The answer should be phrased as if you were speaking to the user. +""" + + +# region Messages and Types + + +def _new_chat_history() -> list[ChatMessage]: + """Typed default factory for chat history list to satisfy type checkers.""" + return [] + + +@dataclass +class MagenticStartMessage: + """A message to start a magentic workflow.""" + + task: ChatMessage + + @classmethod + def from_string(cls, task_text: str) -> "MagenticStartMessage": + """Create a MagenticStartMessage from a simple string. + + Args: + task_text: The task description as a string. + + Returns: + A MagenticStartMessage with the string converted to a ChatMessage. + """ + return cls(task=ChatMessage(role=ChatRole.USER, text=task_text)) + + +@dataclass +class MagenticRequestMessage: + """A request message type for agents in a magentic workflow.""" + + agent_name: str + instruction: str = "" + task_context: str = "" + + +@dataclass +class MagenticResponseMessage: + """A response message type. + + When emitted by the orchestrator you can mark it as a broadcast to all agents, + or target a specific agent by name. + """ + + body: ChatMessage + target_agent: str | None = None # deliver only to this agent if set + broadcast: bool = False # deliver to all agents if True + + +@dataclass +class MagenticPlanReviewRequest(RequestInfoMessage): + """Human-in-the-loop request to review and optionally edit the plan before execution.""" + + # Because RequestInfoMessage defines a default field (request_id), + # subclass fields must also have defaults to satisfy dataclass rules. + task_text: str = "" + facts_text: str = "" + plan_text: str = "" + round_index: int = 0 # number of review rounds so far + + +class MagenticPlanReviewDecision(str, Enum): + APPROVE = "approve" + REVISE = "revise" + + +@dataclass +class MagenticPlanReviewReply: + """Human reply to a plan review request.""" + + decision: MagenticPlanReviewDecision + edited_plan_text: str | None = None # if supplied, becomes the new plan text verbatim + comments: str | None = None # guidance for replan if no edited text provided + + +class MagenticTaskLedger(AFBaseModel): + """Task ledger for the Standard Magentic manager.""" + + facts: Annotated[ChatMessage, Field(description="The facts about the task.")] + plan: Annotated[ChatMessage, Field(description="The plan for the task.")] + + +class MagenticProgressLedgerItem(AFBaseModel): + """A progress ledger item.""" + + reason: str + answer: str | bool + + +class MagenticProgressLedger(AFBaseModel): + """A progress ledger for tracking workflow progress.""" + + is_request_satisfied: MagenticProgressLedgerItem + is_in_loop: MagenticProgressLedgerItem + is_progress_being_made: MagenticProgressLedgerItem + next_speaker: MagenticProgressLedgerItem + instruction_or_question: MagenticProgressLedgerItem + + +class MagenticContext(AFBaseModel): + """Context for the Magentic manager.""" + + task: Annotated[ChatMessage, Field(description="The task to be completed.")] + chat_history: Annotated[list[ChatMessage], Field(description="The chat history to track conversation.")] = Field( + default_factory=_new_chat_history + ) + participant_descriptions: Annotated[ + dict[str, str], Field(description="The descriptions of the participants in the workflow.") + ] + round_count: Annotated[int, Field(description="The number of rounds completed.")] = 0 + stall_count: Annotated[int, Field(description="The number of stalls detected.")] = 0 + reset_count: Annotated[int, Field(description="The number of resets detected.")] = 0 + + def reset(self) -> None: + """Reset the context. + + This will clear the chat history and reset the stall count. + This will not reset the task, round count, or participant descriptions. + """ + self.chat_history.clear() + self.stall_count = 0 + self.reset_count += 1 + + +# endregion Messages and Types + +# region Utilities + + +def _team_block(participants: dict[str, str]) -> str: + """Render participant descriptions as a readable block.""" + return "\n".join(f"- {name}: {desc}" for name, desc in participants.items()) + + +def _first_assistant(messages: list[ChatMessage]) -> ChatMessage | None: + for msg in reversed(messages): + if msg.role == ChatRole.ASSISTANT: + return msg + return None + + +def _extract_json(text: str) -> dict[str, Any]: + """Potentially temp helper method. + + Note: this method is required right now because the ChatClient, when calling + response.text, returns duplicate JSON payloads - need to figure out why. + + The `text` method is concatenating multiple text contents from diff msgs into a single string. + """ + fence = re.search(r"```(?:json)?\s*(\{[\s\S]*?\})\s*```", text, flags=re.IGNORECASE) + if fence: + candidate = fence.group(1) + else: + # Find first balanced JSON object + start = text.find("{") + if start == -1: + raise ValueError("No JSON object found.") + depth = 0 + end = None + for i, ch in enumerate(text[start:], start=start): + if ch == "{": + depth += 1 + elif ch == "}": + depth -= 1 + if depth == 0: + end = i + 1 + break + if end is None: + raise ValueError("Unbalanced JSON braces.") + candidate = text[start:end] + + for attempt in (candidate, candidate.replace("True", "true").replace("False", "false").replace("None", "null")): + with contextlib.suppress(Exception): + val = json.loads(attempt) + if isinstance(val, dict): + return cast(dict[str, Any], val) + + with contextlib.suppress(Exception): + import ast + + obj = ast.literal_eval(candidate) + if isinstance(obj, dict): + return cast(dict[str, Any], obj) + + raise ValueError("Unable to parse JSON from model output.") + + +TModel = TypeVar("TModel", bound=AFBaseModel) + + +def _pd_validate(model: type[TModel], data: dict[str, Any]) -> TModel: + """Validate against a Pydantic model and return a typed instance.""" + return model.model_validate(data) # type: ignore[attr-defined] + + +# endregion Utilities + +# region Magentic Manager + + +class MagenticManagerBase(AFBaseModel, ABC): + """Base class for the Magentic One manager.""" + + max_stall_count: Annotated[int, Field(description="Max number of stalls before a reset.", ge=0)] = 3 + max_reset_count: Annotated[int | None, Field(description="Max number of resets allowed.", ge=0)] = None + max_round_count: Annotated[int | None, Field(description="Max number of agent responses allowed.", gt=0)] = None + + # Base prompt surface for type safety; concrete managers may override with a str field + task_ledger_full_prompt: str = ORCHESTRATOR_TASK_LEDGER_FULL_PROMPT + + @abstractmethod + async def plan(self, magentic_context: MagenticContext) -> ChatMessage: + """Create a plan for the task.""" + ... + + @abstractmethod + async def replan(self, magentic_context: MagenticContext) -> ChatMessage: + """Replan for the task.""" + ... + + @abstractmethod + async def create_progress_ledger(self, magentic_context: MagenticContext) -> MagenticProgressLedger: + """Create a progress ledger.""" + ... + + @abstractmethod + async def prepare_final_answer(self, magentic_context: MagenticContext) -> ChatMessage: + """Prepare the final answer.""" + ... + + +class StandardMagenticManager(MagenticManagerBase): + """Standard Magentic manager that performs real LLM calls via a ChatClientAgent. + + The manager constructs prompts that mirror the original Magentic One orchestration: + - Facts gathering + - Plan creation + - Progress ledger in JSON + - Facts update and plan update on reset + - Final answer synthesis + """ + + model_config = ConfigDict(arbitrary_types_allowed=True) + + chat_client: ChatClient + task_ledger: MagenticTaskLedger | None = None + instructions: str | None = None + + # Prompts may be overridden if needed + task_ledger_facts_prompt: str = ORCHESTRATOR_TASK_LEDGER_FACTS_PROMPT + task_ledger_plan_prompt: str = ORCHESTRATOR_TASK_LEDGER_PLAN_PROMPT + task_ledger_full_prompt: str = ORCHESTRATOR_TASK_LEDGER_FULL_PROMPT + task_ledger_facts_update_prompt: str = ORCHESTRATOR_TASK_LEDGER_FACTS_UPDATE_PROMPT + task_ledger_plan_update_prompt: str = ORCHESTRATOR_TASK_LEDGER_PLAN_UPDATE_PROMPT + progress_ledger_prompt: str = ORCHESTRATOR_PROGRESS_LEDGER_PROMPT + final_answer_prompt: str = ORCHESTRATOR_FINAL_ANSWER_PROMPT + + progress_ledger_retry_count: int = Field(default=3) + + def __init__( + self, + chat_client: ChatClient, + task_ledger: MagenticTaskLedger | None = None, + *, + instructions: str | None = None, + task_ledger_facts_prompt: str | None = None, + task_ledger_plan_prompt: str | None = None, + task_ledger_full_prompt: str | None = None, + task_ledger_facts_update_prompt: str | None = None, + task_ledger_plan_update_prompt: str | None = None, + progress_ledger_prompt: str | None = None, + final_answer_prompt: str | None = None, + max_stall_count: int = 3, + max_reset_count: int | None = None, + max_round_count: int | None = None, + progress_ledger_retry_count: int | None = None, + ) -> None: + """Initialize the Standard Magentic Manager. + + Args: + chat_client: The chat client to use for LLM calls. + instructions: Instructions for the orchestrator agent. + task_ledger: Optional task ledger for managing task state. + task_ledger_facts_prompt: Optional prompt for the task ledger facts. + task_ledger_plan_prompt: Optional prompt for the task ledger plan. + task_ledger_full_prompt: Optional prompt for the full task ledger. + task_ledger_facts_update_prompt: Optional prompt for updating task ledger facts. + task_ledger_plan_update_prompt: Optional prompt for updating task ledger plan. + progress_ledger_prompt: Optional prompt for the progress ledger. + final_answer_prompt: Optional prompt for the final answer. + max_stall_count: Maximum number of stalls allowed. + max_reset_count: Maximum number of resets allowed. + max_round_count: Maximum number of rounds allowed. + progress_ledger_retry_count: Maximum number of retries for the progress ledger. + """ + args: dict[str, Any] = { + "chat_client": chat_client, + "instructions": instructions, + "max_stall_count": max_stall_count, + "max_reset_count": max_reset_count, + "max_round_count": max_round_count, + } + + # Optional prompt overrides + if task_ledger_facts_prompt is not None: + args["task_ledger_facts_prompt"] = task_ledger_facts_prompt + if task_ledger_plan_prompt is not None: + args["task_ledger_plan_prompt"] = task_ledger_plan_prompt + if task_ledger_full_prompt is not None: + args["task_ledger_full_prompt"] = task_ledger_full_prompt + if task_ledger_facts_update_prompt is not None: + args["task_ledger_facts_update_prompt"] = task_ledger_facts_update_prompt + if task_ledger_plan_update_prompt is not None: + args["task_ledger_plan_update_prompt"] = task_ledger_plan_update_prompt + if progress_ledger_prompt is not None: + args["progress_ledger_prompt"] = progress_ledger_prompt + if final_answer_prompt is not None: + args["final_answer_prompt"] = final_answer_prompt + if progress_ledger_retry_count is not None: + args["progress_ledger_retry_count"] = progress_ledger_retry_count + + super().__init__(**args) + + if task_ledger is not None: + self.task_ledger = task_ledger + + async def _complete( + self, + messages: list[ChatMessage], + *, + response_format: type[BaseModel] | None = None, + ) -> ChatMessage: + """Call the underlying ChatClient directly and return the last assistant message. + + If manager instructions are provided, they are injected as a SYSTEM message + at the start of the request to guide the model consistently without needing + an intermediate Agent wrapper. + """ + # Prepend system instructions if present + request_messages: list[ChatMessage] = [] + if self.instructions: + request_messages.append(ChatMessage(role=ChatRole.SYSTEM, text=self.instructions)) + request_messages.extend(messages) + + # Invoke the chat client non-streaming API + response = await self.chat_client.get_response(request_messages, response_format=response_format) + try: + out_messages: list[ChatMessage] | None = list(response.messages) # type: ignore[assignment] + except Exception: + out_messages = None + + if out_messages: + last = out_messages[-1] + return ChatMessage( + role=last.role or ChatRole.ASSISTANT, + text=last.text or "", + author_name=last.author_name or MAGENTIC_MANAGER_NAME, + ) + + # Fallback if no messages + return ChatMessage(role=ChatRole.ASSISTANT, text="No output produced.", author_name=MAGENTIC_MANAGER_NAME) + + async def plan(self, magentic_context: MagenticContext) -> ChatMessage: + """Create facts and plan using the model, then render a combined task ledger as a single assistant message.""" + task_text = magentic_context.task.text + team_text = _team_block(magentic_context.participant_descriptions) + + # Gather facts + facts_user = ChatMessage( + role=ChatRole.USER, + text=self.task_ledger_facts_prompt.format(task=task_text), + ) + facts_msg = await self._complete([*magentic_context.chat_history, facts_user]) + + # Create plan + plan_user = ChatMessage( + role=ChatRole.USER, + text=self.task_ledger_plan_prompt.format(team=team_text), + ) + plan_msg = await self._complete([*magentic_context.chat_history, facts_user, facts_msg, plan_user]) + + # Store ledger and render full combined view + self.task_ledger = MagenticTaskLedger(facts=facts_msg, plan=plan_msg) + + # Also store individual messages in chat_history for better grounding + # This gives the progress ledger model access to the detailed reasoning + magentic_context.chat_history.extend([facts_user, facts_msg, plan_user, plan_msg]) + + combined = self.task_ledger_full_prompt.format( + task=task_text, + team=team_text, + facts=facts_msg.text, + plan=plan_msg.text, + ) + return ChatMessage(role=ChatRole.ASSISTANT, text=combined, author_name=MAGENTIC_MANAGER_NAME) + + async def replan(self, magentic_context: MagenticContext) -> ChatMessage: + """Update facts and plan when stalling or looping has been detected.""" + if self.task_ledger is None: + raise RuntimeError("replan() called before plan(); call plan() once before requesting a replan.") + + task_text = magentic_context.task.text + team_text = _team_block(magentic_context.participant_descriptions) + + # Update facts + facts_update_user = ChatMessage( + role=ChatRole.USER, + text=self.task_ledger_facts_update_prompt.format(task=task_text, old_facts=self.task_ledger.facts.text), + ) + updated_facts = await self._complete([*magentic_context.chat_history, facts_update_user]) + + # Update plan + plan_update_user = ChatMessage( + role=ChatRole.USER, + text=self.task_ledger_plan_update_prompt.format(team=team_text), + ) + updated_plan = await self._complete([ + *magentic_context.chat_history, + facts_update_user, + updated_facts, + plan_update_user, + ]) + + # Store and render + self.task_ledger = MagenticTaskLedger(facts=updated_facts, plan=updated_plan) + + # Also store individual messages in chat_history for better grounding + # This gives the progress ledger model access to the detailed reasoning + magentic_context.chat_history.extend([facts_update_user, updated_facts, plan_update_user, updated_plan]) + + combined = self.task_ledger_full_prompt.format( + task=task_text, + team=team_text, + facts=updated_facts.text, + plan=updated_plan.text, + ) + return ChatMessage(role=ChatRole.ASSISTANT, text=combined, author_name=MAGENTIC_MANAGER_NAME) + + async def create_progress_ledger(self, magentic_context: MagenticContext) -> MagenticProgressLedger: + """Use the model to produce a JSON progress ledger based on the conversation so far. + + Adds lightweight retries with backoff for transient parse issues and avoids selecting a + non-existent "unknown" agent. If there are no participants, a clear error is raised. + """ + agent_names = list(magentic_context.participant_descriptions.keys()) + if not agent_names: + raise RuntimeError("No participants configured; cannot determine next speaker.") + + names_csv = ", ".join(agent_names) + team_text = _team_block(magentic_context.participant_descriptions) + + prompt = self.progress_ledger_prompt.format( + task=magentic_context.task.text, + team=team_text, + names=names_csv, + ) + user_message = ChatMessage(role=ChatRole.USER, text=prompt) + + # Include full context to help the model decide current stage, with small retry loop + attempts = 0 + last_error: Exception | None = None + while attempts < self.progress_ledger_retry_count: + raw = await self._complete( + [*magentic_context.chat_history, user_message], + response_format=MagenticProgressLedger, + ) + try: + ledger_dict = _extract_json(raw.text) + return _pd_validate(MagenticProgressLedger, ledger_dict) + except Exception as ex: + last_error = ex + attempts += 1 + logger.warning( + f"Progress ledger JSON parse failed (attempt {attempts}/{self.progress_ledger_retry_count}): {ex}" + ) + if attempts < self.progress_ledger_retry_count: + # brief backoff before next try + await asyncio.sleep(0.25 * attempts) + + raise RuntimeError( + f"Progress ledger parse failed after {self.progress_ledger_retry_count} attempt(s): {last_error}" + ) + + async def prepare_final_answer(self, magentic_context: MagenticContext) -> ChatMessage: + """Ask the model to produce the final answer addressed to the user.""" + prompt = self.final_answer_prompt.format(task=magentic_context.task.text) + user_message = ChatMessage(role=ChatRole.USER, text=prompt) + response = await self._complete([*magentic_context.chat_history, user_message]) + # Ensure role is assistant + return ChatMessage( + role=ChatRole.ASSISTANT, + text=response.text, + author_name=response.author_name or MAGENTIC_MANAGER_NAME, + ) + + +# endregion Magentic Manager + +# region Magentic Executors + + +class MagenticOrchestratorExecutor(Executor): + """Magentic orchestrator executor that handles all orchestration logic. + + This executor manages the entire Magentic One workflow including: + - Initial planning and task ledger creation + - Progress tracking and completion detection + - Agent coordination and message routing + - Reset and replanning logic + """ + + # Typed attributes (initialized in __init__) + _agent_executors: dict[str, "MagenticAgentExecutor"] + _context: "MagenticContext | None" + _task_ledger: "ChatMessage | None" + _inner_loop_lock: asyncio.Lock + _require_plan_signoff: bool + _plan_review_round: int + _max_plan_review_rounds: int + _terminated: bool + + def __init__( + self, + manager: MagenticManagerBase, + participants: dict[str, str], + result_callback: Callable[[ChatMessage], Awaitable[None]] | None = None, + agent_response_callback: Callable[[str, ChatMessage], Awaitable[None]] | None = None, + streaming_agent_response_callback: Callable[[str, AgentRunResponseUpdate, bool], Awaitable[None]] | None = None, + *, + message_callback: Callable[[str, ChatMessage, str], Awaitable[None]] | None = None, + require_plan_signoff: bool = False, + max_plan_review_rounds: int = 10, + executor_id: str | None = None, + ) -> None: + """Initializes a new instance of the MagenticOrchestratorExecutor. + + Args: + manager: The Magentic manager instance. + participants: A dictionary of participant IDs to their names. + result_callback: An optional callback for handling final results. + message_callback: An optional generic callback for orchestrator-emitted messages. The third + argument is a kind string, e.g., ORCH_MSG_KIND_USER_TASK or ORCH_MSG_KIND_TASK_LEDGER. + agent_response_callback: An optional callback for handling agent responses. + streaming_agent_response_callback: An optional callback for handling streaming agent responses. + require_plan_signoff: Whether to require plan sign-off from a human. + max_plan_review_rounds: The maximum number of plan review rounds. + executor_id: An optional executor ID. + """ + super().__init__(executor_id or f"magentic_orchestrator_{uuid4().hex[:8]}") + self._manager = manager + self._participants = participants + self._result_callback = result_callback + self._message_callback = message_callback + self._agent_response_callback = agent_response_callback + self._streaming_agent_response_callback = streaming_agent_response_callback + self._context = None + self._task_ledger = None + self._require_plan_signoff = require_plan_signoff + self._plan_review_round = 0 + self._max_plan_review_rounds = max_plan_review_rounds + self._inner_loop_lock = asyncio.Lock() + # Registry of agent executors for internal coordination (e.g., resets) + self._agent_executors = {} + # Terminal state marker to stop further processing after completion/limits + self._terminated = False + + def register_agent_executor(self, name: str, executor: "MagenticAgentExecutor") -> None: + """Register an agent executor for internal control (no messages).""" + self._agent_executors[name] = executor + + @handler + async def handle_start_message( + self, + message: MagenticStartMessage, + context: WorkflowContext[MagenticResponseMessage | MagenticRequestMessage | MagenticPlanReviewRequest], + ) -> None: + """Handle the initial start message to begin orchestration.""" + if getattr(self, "_terminated", False): + return + logger.info("Magentic Orchestrator: Received start message") + + self._context = MagenticContext( + task=message.task, + participant_descriptions=self._participants, + ) + # Record the original user task in orchestrator context (no broadcast) + self._context.chat_history.append(message.task) + # Non-streaming callback for the orchestrator receipt of the task + if self._message_callback: + with contextlib.suppress(Exception): + await self._message_callback(self.id, message.task, ORCH_MSG_KIND_USER_TASK) + + # Initial planning using the manager with real model calls + self._task_ledger = await self._manager.plan(self._context.model_copy(deep=True)) + + # If a human must sign off, ask now and return. The response handler will resume. + if self._require_plan_signoff: + await self._send_plan_review_request(context) + return + + # Add task ledger to conversation history + self._context.chat_history.append(self._task_ledger) + + logger.debug("Task ledger created.") + + if self._message_callback: + with contextlib.suppress(Exception): + await self._message_callback(self.id, self._task_ledger, ORCH_MSG_KIND_TASK_LEDGER) + + # Start the inner loop + ctx2 = cast( + WorkflowContext[MagenticResponseMessage | MagenticRequestMessage], + context, + ) + await self._run_inner_loop(ctx2) + + @handler + async def handle_response_message( + self, + message: MagenticResponseMessage, + context: WorkflowContext[MagenticResponseMessage | MagenticRequestMessage], + ) -> None: + """Handle responses from agents.""" + if getattr(self, "_terminated", False): + return + if self._context is None: + raise RuntimeError("Magentic Orchestrator: Received response but not initialized") + + logger.debug("Magentic Orchestrator: Received response from agent") + + # Add transfer message if needed + if message.body.role != ChatRole.USER: + transfer_msg = ChatMessage( + role=ChatRole.USER, + text=f"Transferred to {getattr(message.body, 'author_name', 'agent')}", + ) + self._context.chat_history.append(transfer_msg) + + # Add agent response to context + self._context.chat_history.append(message.body) + + # Continue with inner loop + await self._run_inner_loop(context) + + @handler + async def handle_plan_review_response( + self, + response: RequestResponse[MagenticPlanReviewRequest, MagenticPlanReviewReply], + context: WorkflowContext[ + # may broadcast ledger next, or ask for another round of review + MagenticResponseMessage | MagenticRequestMessage | MagenticPlanReviewRequest + ], + ) -> None: + if getattr(self, "_terminated", False): + return + if self._context is None: + return + + human = response.data + if human is None: + # Defensive fallback: treat as revise with empty comments + human = MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.REVISE, comments="") + + if human.decision == MagenticPlanReviewDecision.APPROVE: + # Close the review loop on approval (no further plan review requests this run) + self._require_plan_signoff = False + # If the user supplied an edited plan, adopt it + if human.edited_plan_text: + # Update the manager's internal ledger and rebuild the combined message + mgr_ledger = getattr(self._manager, "task_ledger", None) + if mgr_ledger is not None: + mgr_ledger.plan.text = human.edited_plan_text + team_text = _team_block(self._participants) + combined = self._manager.task_ledger_full_prompt.format( + task=self._context.task.text, + team=team_text, + facts=(mgr_ledger.facts.text if mgr_ledger else ""), + plan=human.edited_plan_text, + ) + self._task_ledger = ChatMessage( + role=ChatRole.ASSISTANT, + text=combined, + author_name=MAGENTIC_MANAGER_NAME, + ) + # If approved with comments but no edited text, apply comments via replan and proceed (no extra review) + elif human.comments: + # Record the human feedback for grounding + self._context.chat_history.append( + ChatMessage(role=ChatRole.USER, text=f"Human plan feedback: {human.comments}") + ) + # Ask the manager to replan based on comments; proceed immediately + self._task_ledger = await self._manager.replan(self._context.model_copy(deep=True)) + + # Record the signed-off plan (no broadcast) + if self._task_ledger: + self._context.chat_history.append(self._task_ledger) + if self._message_callback: + with contextlib.suppress(Exception): + await self._message_callback(self.id, self._task_ledger, ORCH_MSG_KIND_TASK_LEDGER) + + # Enter the normal coordination loop + ctx2 = cast( + WorkflowContext[MagenticResponseMessage | MagenticRequestMessage], + context, + ) + await self._run_inner_loop(ctx2) + return + + # Otherwise, REVISION round + self._plan_review_round += 1 + if self._plan_review_round > self._max_plan_review_rounds: + logger.warning("Magentic Orchestrator: Max plan review rounds reached. Proceeding with current plan.") + # Stop any further plan review requests for the rest of this run + self._require_plan_signoff = False + # Add a clear note to the conversation so users know review is closed + notice = ChatMessage( + role=ChatRole.ASSISTANT, + text=( + "Plan review closed after max rounds. Proceeding with the current plan and will no longer " + "prompt for plan approval." + ), + author_name=MAGENTIC_MANAGER_NAME, + ) + self._context.chat_history.append(notice) + if self._message_callback: + with contextlib.suppress(Exception): + await self._message_callback(self.id, notice, ORCH_MSG_KIND_NOTICE) + if self._task_ledger: + self._context.chat_history.append(self._task_ledger) + # No further review requests; proceed directly into coordination + ctx2 = cast( + WorkflowContext[MagenticResponseMessage | MagenticRequestMessage], + context, + ) + await self._run_inner_loop(ctx2) + return + + # If the user provided an edited plan, adopt it directly and ask them to confirm once more + if human.edited_plan_text: + mgr_ledger2 = getattr(self._manager, "task_ledger", None) + if mgr_ledger2 is not None: + mgr_ledger2.plan.text = human.edited_plan_text + # Rebuild combined message for preview in the next review request + team_text = _team_block(self._participants) + combined = self._manager.task_ledger_full_prompt.format( + task=self._context.task.text, + team=team_text, + facts=(mgr_ledger2.facts.text if mgr_ledger2 else ""), + plan=human.edited_plan_text, + ) + self._task_ledger = ChatMessage(role=ChatRole.ASSISTANT, text=combined, author_name=MAGENTIC_MANAGER_NAME) + await self._send_plan_review_request(context) + return + + # Else pass comments into the chat history and replan with the manager + if human.comments: + self._context.chat_history.append( + ChatMessage(role=ChatRole.USER, text=f"Human plan feedback: {human.comments}") + ) + + # Ask the manager to replan; this only adjusts the plan stage, not a full reset + self._task_ledger = await self._manager.replan(self._context.model_copy(deep=True)) + await self._send_plan_review_request(context) + + async def _run_outer_loop( + self, + context: WorkflowContext[MagenticResponseMessage | MagenticRequestMessage], + ) -> None: + """Run the outer orchestration loop - planning phase.""" + if self._context is None: + raise RuntimeError("Context not initialized") + + logger.info("Magentic Orchestrator: Outer loop - entering inner loop") + + # Add task ledger to history if not already there + if self._task_ledger and ( + not self._context.chat_history or self._context.chat_history[-1] != self._task_ledger + ): + self._context.chat_history.append(self._task_ledger) + + # Optionally surface the updated task ledger via message callback (no broadcast) + if self._task_ledger and self._message_callback: + with contextlib.suppress(Exception): + await self._message_callback(self.id, self._task_ledger, ORCH_MSG_KIND_TASK_LEDGER) + + # Start inner loop + await self._run_inner_loop(context) + + async def _run_inner_loop( + self, + context: WorkflowContext[MagenticResponseMessage | MagenticRequestMessage], + ) -> None: + """Run the inner orchestration loop. Coordination phase. Serialized with a lock.""" + if self._context is None or self._task_ledger is None: + raise RuntimeError("Context or task ledger not initialized") + async with self._inner_loop_lock: + await self._run_inner_loop_locked(context) + + async def _run_inner_loop_locked( + self, + context: WorkflowContext[MagenticResponseMessage | MagenticRequestMessage], + ) -> None: + """Run inner loop with exclusive access.""" + # Narrow optional context for the remainder of this method + ctx = self._context + if ctx is None: + raise RuntimeError("Context not initialized") + # Check limits first + within_limits = await self._check_within_limits_or_complete(context) + if not within_limits: + return + + ctx.round_count += 1 + logger.info("Magentic Orchestrator: Inner loop - round %s", ctx.round_count) + + # Create progress ledger using the manager + try: + current_progress_ledger = await self._manager.create_progress_ledger(ctx.model_copy(deep=True)) + except Exception as ex: + logger.warning("Magentic Orchestrator: Progress ledger creation failed, triggering reset: %s", ex) + await self._reset_and_replan(context) + return + + logger.debug( + "Progress evaluation: satisfied=%s, next=%s", + current_progress_ledger.is_request_satisfied.answer, + current_progress_ledger.next_speaker.answer, + ) + + # Check for task completion + if current_progress_ledger.is_request_satisfied.answer: + logger.info("Magentic Orchestrator: Task completed") + await self._prepare_final_answer(context) + return + + # Check for stalling or looping + if not current_progress_ledger.is_progress_being_made.answer or current_progress_ledger.is_in_loop.answer: + ctx.stall_count += 1 + else: + ctx.stall_count = max(0, ctx.stall_count - 1) + + if ctx.stall_count > self._manager.max_stall_count: + logger.info("Magentic Orchestrator: Stalling detected. Resetting and replanning") + await self._reset_and_replan(context) + return + + # Determine the next speaker and instruction + answer_val = current_progress_ledger.next_speaker.answer + if not isinstance(answer_val, str): + # Fallback to first participant if ledger returns non-string + logger.warning("Next speaker answer was not a string; selecting first participant as fallback") + answer_val = next(iter(self._participants.keys())) + next_speaker_value: str = answer_val + instruction = current_progress_ledger.instruction_or_question.answer + + if next_speaker_value not in self._participants: + logger.warning("Invalid next speaker: %s", next_speaker_value) + await self._prepare_final_answer(context) + return + + # Add instruction to conversation (assistant guidance) + instruction_msg = ChatMessage( + role=ChatRole.ASSISTANT, + text=str(instruction), + author_name=MAGENTIC_MANAGER_NAME, + ) + ctx.chat_history.append(instruction_msg) + # Surface instruction message to observers + if self._message_callback: + with contextlib.suppress(Exception): + await self._message_callback(self.id, instruction_msg, ORCH_MSG_KIND_INSTRUCTION) + + # Determine the selected agent's executor id + target_executor_id = f"agent_{next_speaker_value}" + + # Request specific agent to respond + logger.debug("Magentic Orchestrator: Requesting %s to respond", next_speaker_value) + await context.send_message( + MagenticRequestMessage( + agent_name=next_speaker_value, + instruction=str(instruction), + task_context=ctx.task.text, + ), + target_id=target_executor_id, + ) + + async def _reset_and_replan( + self, + context: WorkflowContext[MagenticResponseMessage | MagenticRequestMessage], + ) -> None: + """Reset context and replan.""" + if self._context is None: + return + + logger.info("Magentic Orchestrator: Resetting and replanning") + + # Reset context + self._context.reset() + + # Replan + self._task_ledger = await self._manager.replan(self._context.model_copy(deep=True)) + + # Internally reset all registered agent executors (no handler/messages involved) + for agent in self._agent_executors.values(): + with contextlib.suppress(Exception): + agent.reset() + + # Restart outer loop + await self._run_outer_loop(context) + + async def _prepare_final_answer( + self, + context: WorkflowContext[MagenticResponseMessage | MagenticRequestMessage], + ) -> None: + """Prepare the final answer using the manager.""" + if self._context is None: + return + + logger.info("Magentic Orchestrator: Preparing final answer") + final_answer = await self._manager.prepare_final_answer(self._context.model_copy(deep=True)) + + # Emit a completed event for the workflow + await context.add_event(WorkflowCompletedEvent(final_answer)) + + if self._result_callback: + await self._result_callback(final_answer) + + async def _check_within_limits_or_complete( + self, + context: WorkflowContext[MagenticResponseMessage | MagenticRequestMessage], + ) -> bool: + """Check if orchestrator is within operational limits.""" + if self._context is None: + return False + ctx = self._context + + hit_round_limit = self._manager.max_round_count is not None and ctx.round_count >= self._manager.max_round_count + hit_reset_limit = self._manager.max_reset_count is not None and ctx.reset_count >= self._manager.max_reset_count + + if hit_round_limit or hit_reset_limit: + limit_type = "round" if hit_round_limit else "reset" + logger.error("Magentic Orchestrator: Max %s count reached", limit_type) + + # Only emit completion once and then mark terminated + if not self._terminated: + self._terminated = True + # Get partial result + partial_result = _first_assistant(ctx.chat_history) + if partial_result is None: + partial_result = ChatMessage( + role=ChatRole.ASSISTANT, + text=f"Stopped due to {limit_type} limit. No partial result available.", + author_name=MAGENTIC_MANAGER_NAME, + ) + + # Emit a completed event with the partial result + await context.add_event(WorkflowCompletedEvent(partial_result)) + + if self._result_callback: + await self._result_callback(partial_result) + return False + + return True + + async def _send_plan_review_request( + self, + context: WorkflowContext[MagenticResponseMessage | MagenticRequestMessage | MagenticPlanReviewRequest], + ) -> None: + """Emit a PlanReviewRequest via RequestInfoExecutor.""" + # If plan sign-off is disabled (e.g., ran out of review rounds), do nothing + if not self._require_plan_signoff: + return + ledger = getattr(self._manager, "task_ledger", None) + facts_text = ledger.facts.text if ledger else "" + plan_text = ledger.plan.text if ledger else "" + task_text = self._context.task.text if self._context else "" + + req = MagenticPlanReviewRequest( + task_text=task_text, + facts_text=facts_text, + plan_text=plan_text, + round_index=self._plan_review_round, + ) + await context.send_message(req) + + +class MagenticAgentExecutor(Executor): + """Magentic agent executor that wraps an agent for participation in workflows. + + This executor handles: + - Receiving task ledger broadcasts + - Responding to specific agent requests + - Resetting agent state when needed + """ + + def __init__( + self, + agent: AIAgent | Executor, + agent_id: str, + agent_response_callback: Callable[[str, ChatMessage], Awaitable[None]] | None = None, + streaming_agent_response_callback: Callable[[str, AgentRunResponseUpdate, bool], Awaitable[None]] | None = None, + ) -> None: + super().__init__(f"agent_{agent_id}") + self._agent = agent + self._agent_id = agent_id + self._chat_history: list[ChatMessage] = [] + self._agent_response_callback = agent_response_callback + self._streaming_agent_response_callback = streaming_agent_response_callback + + @handler + async def handle_response_message( + self, message: MagenticResponseMessage, context: WorkflowContext[MagenticResponseMessage] + ) -> None: + """Handle response message (task ledger broadcast).""" + logger.debug("Agent %s: Received response message", self._agent_id) + + # Check if this message is intended for this agent + if message.target_agent is not None and message.target_agent != self._agent_id and not message.broadcast: + # Message is targeted to a different agent, ignore it + logger.debug("Agent %s: Ignoring message targeted to %s", self._agent_id, message.target_agent) + return + + # Add transfer message if needed + if message.body.role != ChatRole.USER: + transfer_msg = ChatMessage( + role=ChatRole.USER, + text=f"Transferred to {getattr(message.body, 'author_name', 'agent')}", + ) + self._chat_history.append(transfer_msg) + + # Add message to agent's history + self._chat_history.append(message.body) + + def _get_persona_adoption_role(self) -> ChatRole: + """Determine the best role for persona adoption messages. + + Uses SYSTEM role if the agent supports it, otherwise falls back to USER. + """ + # Only AgentBase-derived agents are assumed to support SYSTEM messages reliably. + from agent_framework import AgentBase as _AF_AgentBase # local import to avoid cycles + + if isinstance(self._agent, _AF_AgentBase) and hasattr(self._agent, "chat_client"): + return ChatRole.SYSTEM + # For other agent types or when we can't determine support, use USER + return ChatRole.USER + + @handler + async def handle_request_message( + self, message: MagenticRequestMessage, context: WorkflowContext[MagenticResponseMessage] + ) -> None: + """Handle request to respond.""" + if message.agent_name != self._agent_id: + return + + logger.info("Agent %s: Received request to respond", self._agent_id) + + # Add persona adoption message with appropriate role + persona_role = self._get_persona_adoption_role() + persona_msg = ChatMessage( + role=persona_role, + text=f"Transferred to {self._agent_id}, adopt the persona immediately.", + ) + self._chat_history.append(persona_msg) + + # Add the orchestrator's instruction as a USER message so the agent treats it as the prompt + if message.instruction: + self._chat_history.append(ChatMessage(role=ChatRole.USER, text=message.instruction)) + try: + # If the participant is not an invokable AgentBase, return a no-op response. + from agent_framework import AgentBase as _AF_AgentBase # local import to avoid cycles + + if not isinstance(self._agent, _AF_AgentBase): + response = ChatMessage( + role=ChatRole.ASSISTANT, + text=f"{self._agent_id} is a workflow executor and cannot be invoked directly.", + author_name=self._agent_id, + ) + else: + # Invoke the agent + response = await self._invoke_agent() + self._chat_history.append(response) + + # Send response back to orchestrator + await context.send_message(MagenticResponseMessage(body=response)) + + except Exception as e: + logger.warning("Agent %s invoke failed: %s", self._agent_id, e) + # Fallback response + response = ChatMessage( + role=ChatRole.ASSISTANT, + text=f"Agent {self._agent_id}: Error processing request - {str(e)[:100]}", + ) + self._chat_history.append(response) + await context.send_message(MagenticResponseMessage(body=response)) + + def reset(self) -> None: + """Reset the internal chat history of the agent (internal operation).""" + logger.debug("Agent %s: Resetting chat history", self._agent_id) + self._chat_history.clear() + + async def _invoke_agent(self) -> ChatMessage: + """Invoke the wrapped agent and return a response.""" + logger.debug(f"Agent {self._agent_id}: Running with {len(self._chat_history)} messages") + + updates: list[AgentRunResponseUpdate] = [] + # The wrapped participant is guaranteed to be an AgentBase when this is called. + agent = cast("AIAgent", self._agent) + async for update in agent.run_streaming(messages=self._chat_history): # type: ignore[attr-defined] + updates.append(update) + if self._streaming_agent_response_callback is not None: + with contextlib.suppress(Exception): + await self._streaming_agent_response_callback( + self._agent_id, + update, + False, + ) + + run_result: AgentRunResponse = AgentRunResponse.from_agent_run_response_updates(updates) + + # mark final using last update if available + if updates and self._streaming_agent_response_callback is not None: + with contextlib.suppress(Exception): + await self._streaming_agent_response_callback(self._agent_id, updates[-1], True) + messages: list[ChatMessage] | None = None + with contextlib.suppress(Exception): + messages = list(run_result.messages) # type: ignore[assignment] + if messages and len(messages) > 0: + last: ChatMessage = messages[-1] + author = last.author_name or self._agent_id + role: ChatRole = last.role if last.role else ChatRole.ASSISTANT + text = last.text or str(last) + msg = ChatMessage(role=role, text=text, author_name=author) + if self._agent_response_callback is not None: + with contextlib.suppress(Exception): + await self._agent_response_callback(self._agent_id, msg) + return msg + + msg = ChatMessage( + role=ChatRole.ASSISTANT, + text=f"Agent {self._agent_id}: No output produced", + author_name=self._agent_id, + ) + if self._agent_response_callback is not None: + with contextlib.suppress(Exception): + await self._agent_response_callback(self._agent_id, msg) + return msg + + +# endregion Magentic Executors + +# region Magentic Workflow Builder + + +class MagenticBuilder: + """High-level builder for creating Magentic One workflows.""" + + def __init__(self) -> None: + self._participants: dict[str, AIAgent | Executor] = {} + self._manager: MagenticManagerBase | None = None + self._exception_callback: Callable[[Exception], None] | None = None + self._result_callback: Callable[[ChatMessage], Awaitable[None]] | None = None + # Orchestrator-emitted message callback: (orchestrator_id, message, kind) + self._message_callback: Callable[[str, ChatMessage, str], Awaitable[None]] | None = None + self._agent_response_callback: Callable[[str, ChatMessage], Awaitable[None]] | None = None + self._agent_streaming_callback: Callable[[str, AgentRunResponseUpdate, bool], Awaitable[None]] | None = None + self._enable_plan_review: bool = False + # Unified callback wiring + self._unified_callback: CallbackSink | None = None + self._callback_mode: MagenticCallbackMode | None = None + + def participants(self, **participants: AIAgent | Executor) -> Self: + """Add participants (agents) to the workflow.""" + self._participants.update(participants) + return self + + def with_plan_review(self, enable: bool = True) -> "MagenticBuilder": + """Require human sign-off on the plan before coordination begins.""" + self._enable_plan_review = enable + return self + + def with_standard_manager( + self, + manager: MagenticManagerBase | None = None, + *, + # Constructor args for StandardMagenticManager when manager is not provided + chat_client: ChatClient | None = None, + task_ledger: MagenticTaskLedger | None = None, + instructions: str | None = None, + # Prompt overrides + task_ledger_facts_prompt: str | None = None, + task_ledger_plan_prompt: str | None = None, + task_ledger_full_prompt: str | None = None, + task_ledger_facts_update_prompt: str | None = None, + task_ledger_plan_update_prompt: str | None = None, + progress_ledger_prompt: str | None = None, + final_answer_prompt: str | None = None, + # Limits + max_stall_count: int = 3, + max_reset_count: int | None = None, + max_round_count: int | None = None, + ) -> Self: + """Configure the Magentic manager. + + Usage patterns: + - Provide an existing manager instance (recommended for custom or preconfigured managers): + builder.with_standard_manager(my_manager) + - Or pass explicit kwargs to construct a StandardMagenticManager for you: + builder.with_standard_manager(chat_client=my_client, max_round_count=10, max_stall_count=3) + + Notes: + - If ``manager`` is provided, it is used as-is (can be a StandardMagenticManager or any MagenticManagerBase). + - If not provided, ``chat_client`` is required and a new StandardMagenticManager will be created + with the provided options. + """ + if manager is not None: + self._manager = manager + return self + + if chat_client is None: + raise ValueError( + "chat_client is required when manager is not provided: with_standard_manager(chat_client=...)" + ) + + self._manager = StandardMagenticManager( + chat_client=chat_client, + task_ledger=task_ledger, + instructions=instructions, + task_ledger_facts_prompt=task_ledger_facts_prompt, + task_ledger_plan_prompt=task_ledger_plan_prompt, + task_ledger_full_prompt=task_ledger_full_prompt, + task_ledger_facts_update_prompt=task_ledger_facts_update_prompt, + task_ledger_plan_update_prompt=task_ledger_plan_update_prompt, + progress_ledger_prompt=progress_ledger_prompt, + final_answer_prompt=final_answer_prompt, + max_stall_count=max_stall_count, + max_reset_count=max_reset_count, + max_round_count=max_round_count, + ) + return self + + def on_exception(self, callback: Callable[[Exception], None]) -> Self: + """Set the exception callback.""" + self._exception_callback = callback + return self + + def on_result(self, callback: Callable[[ChatMessage], Awaitable[None]]) -> Self: + """Set the result callback.""" + self._result_callback = callback + return self + + def on_event( + self, callback: CallbackSink, *, mode: MagenticCallbackMode = MagenticCallbackMode.NON_STREAMING + ) -> Self: + """Register a single sink for all workflow, orchestrator, and agent events. + + mode=STREAMING yields AgentDeltaEvent plus AgentMessageEvent at the end. + mode=NON_STREAMING only yields AgentMessageEvent at the end (no deltas). + """ + self._unified_callback = callback + self._callback_mode = mode + return self + + def build(self) -> "MagenticWorkflow": + """Build a Magentic workflow with the orchestrator and all agent executors.""" + if not self._participants: + raise ValueError("No participants added to Magentic workflow") + + if self._manager is None: + raise ValueError("No manager configured. Call with_standard_manager(...) before build().") + + logger.info("Building Magentic workflow with %d participants", len(self._participants)) + + # Create participant descriptions + participant_descriptions: dict[str, str] = {} + for name, participant in self._participants.items(): + if isinstance(participant, AgentBase): + description = getattr(participant, "description", None) or f"Agent {name}" + else: + description = f"Executor {name}" + participant_descriptions[name] = description + + # If unified sink is provided, map it to legacy callback surfaces + unified = self._unified_callback + mode = self._callback_mode + + if unified is not None: + prior_result = self._result_callback + + async def _on_result(msg: ChatMessage) -> None: + with contextlib.suppress(Exception): + await unified(MagenticFinalResultEvent(message=msg)) + if prior_result is not None: + with contextlib.suppress(Exception): + await prior_result(msg) + + async def _on_orch(orch_id: str, msg: ChatMessage, kind: str) -> None: + with contextlib.suppress(Exception): + await unified(MagenticOrchestratorMessageEvent(orchestrator_id=orch_id, message=msg, kind=kind)) + + async def _on_agent_final(agent_id: str, message: ChatMessage) -> None: + with contextlib.suppress(Exception): + await unified(MagenticAgentMessageEvent(agent_id=agent_id, message=message)) + + async def _on_agent_delta(agent_id: str, update: AgentRunResponseUpdate, is_final: bool) -> None: + if mode == MagenticCallbackMode.STREAMING: + # TODO(evmattso): Make sure we surface other non-text streaming items + # (or per-type events) and plumb through consumers. + chunk: str | None = getattr(update, "text", None) + if not chunk: + with contextlib.suppress(Exception): + contents = getattr(update, "contents", []) or [] + chunk = "".join(getattr(c, "text", "") for c in contents) or None + if chunk: + with contextlib.suppress(Exception): + await unified( + MagenticAgentDeltaEvent( + agent_id=agent_id, + text=chunk, + role=getattr(update, "role", None), + ) + ) + # Emit function call/result items if present on the update + with contextlib.suppress(Exception): + content_items = getattr(update, "contents", []) or [] + for item in content_items: + if isinstance(item, FunctionCallContent): + await unified( + MagenticAgentDeltaEvent( + agent_id=agent_id, + function_call_id=getattr(item, "call_id", None), + function_call_name=getattr(item, "name", None), + function_call_arguments=getattr(item, "arguments", None), + role=getattr(update, "role", None), + ) + ) + elif isinstance(item, FunctionResultContent): + await unified( + MagenticAgentDeltaEvent( + agent_id=agent_id, + function_result_id=getattr(item, "call_id", None), + function_result=getattr(item, "result", None), + role=getattr(update, "role", None), + ) + ) + # final aggregation handled by _on_agent_final via agent_response_callback + + # Override delegates for orchestrator and agent callbacks + self._result_callback = _on_result + self._message_callback = _on_orch + self._agent_response_callback = _on_agent_final + self._agent_streaming_callback = _on_agent_delta if mode == MagenticCallbackMode.STREAMING else None + + # Create orchestrator executor + orchestrator_executor = MagenticOrchestratorExecutor( + manager=self._manager, + participants=participant_descriptions, + result_callback=self._result_callback, + message_callback=self._message_callback, + agent_response_callback=self._agent_response_callback, + streaming_agent_response_callback=self._agent_streaming_callback, + require_plan_signoff=self._enable_plan_review, + ) + + # Create workflow builder and set orchestrator as start + workflow_builder = WorkflowBuilder().set_start_executor(orchestrator_executor) + + if self._enable_plan_review: + from ._executor import RequestInfoExecutor + + request_info = RequestInfoExecutor() + workflow_builder = ( + workflow_builder + # Only route plan review asks to request_info + .add_edge( + orchestrator_executor, + request_info, + condition=lambda msg: isinstance(msg, MagenticPlanReviewRequest), + ).add_edge(request_info, orchestrator_executor) + ) + + def _route_to_agent(msg: object, *, agent_name: str) -> bool: + """Route only messages meant for this agent. + + - MagenticRequestMessage -> only to the named agent + - MagenticResponseMessage -> broadcast=True to all, or target_agent==agent_name + Everything else (e.g., RequestInfoMessage) -> do not route to agents. + """ + if isinstance(msg, MagenticRequestMessage): + return msg.agent_name == agent_name + if isinstance(msg, MagenticResponseMessage): + return bool(getattr(msg, "broadcast", False)) or getattr(msg, "target_agent", None) == agent_name + return False + + # Add agent executors and connect them + for name, participant in self._participants.items(): + agent_executor = MagenticAgentExecutor( + participant, + name, + agent_response_callback=self._agent_response_callback, + streaming_agent_response_callback=self._agent_streaming_callback, + ) + # Register for internal control (e.g., reset) + orchestrator_executor.register_agent_executor(name, agent_executor) + + # Add bidirectional edges between orchestrator and agent + def _cond(msg: object, _an: str = name) -> bool: + return _route_to_agent(msg, agent_name=_an) + + workflow_builder = workflow_builder.add_edge( + orchestrator_executor, + agent_executor, + condition=_cond, + ).add_edge(agent_executor, orchestrator_executor) + + return MagenticWorkflow(workflow_builder.build()) + + def start_with_string(self, task: str) -> "MagenticWorkflow": + """Build a Magentic workflow and return a wrapper with convenience methods for string tasks. + + Args: + task: The task description as a string. + + Returns: + A MagenticWorkflow wrapper that provides convenience methods for starting with strings. + """ + return MagenticWorkflow(self.build().workflow, task) + + def start_with_message(self, task: ChatMessage) -> "MagenticWorkflow": + """Build a Magentic workflow and return a wrapper with convenience methods for ChatMessage tasks. + + Args: + task: The task as a ChatMessage. + + Returns: + A MagenticWorkflow wrapper that provides convenience methods. + """ + return MagenticWorkflow(self.build().workflow, task.text) + + def start_with(self, task: str | ChatMessage) -> "MagenticWorkflow": + """Build a Magentic workflow and return a wrapper with convenience methods. + + Args: + task: The task description as a string or ChatMessage. + + Returns: + A MagenticWorkflow wrapper that provides convenience methods. + """ + if isinstance(task, str): + return self.start_with_string(task) + return self.start_with_message(task) + + +# endregion Magentic Workflow Builder + + +# region Magentic Workflow + + +class MagenticWorkflow: + """A wrapper around the base Workflow that provides convenience methods for Magentic workflows.""" + + def __init__(self, workflow: Workflow, task_text: str | None = None): + self._workflow = workflow + self._task_text = task_text + + @property + def workflow(self) -> Workflow: + """Access the underlying workflow.""" + return self._workflow + + async def run_streaming_with_string(self, task_text: str) -> AsyncIterable[WorkflowEvent]: + """Run the workflow with a task string. + + Args: + task_text: The task description as a string. + + Yields: + WorkflowEvent: The events generated during the workflow execution. + """ + start_message = MagenticStartMessage.from_string(task_text) + async for event in self._workflow.run_streaming(start_message): + yield event + + async def run_streaming_with_message(self, task_message: ChatMessage) -> AsyncIterable[WorkflowEvent]: + """Run the workflow with a ChatMessage. + + Args: + task_message: The task as a ChatMessage. + + Yields: + WorkflowEvent: The events generated during the workflow execution. + """ + start_message = MagenticStartMessage(task=task_message) + async for event in self._workflow.run_streaming(start_message): + yield event + + async def run_streaming(self, message: Any | None = None) -> AsyncIterable[WorkflowEvent]: + """Run the workflow with either a message object or the preset task string. + + Args: + message: The message to send. If None and task_text was provided during construction, + uses the preset task string. + + Yields: + WorkflowEvent: The events generated during the workflow execution. + """ + if message is None: + if self._task_text is None: + raise ValueError("No message provided and no preset task text available") + message = MagenticStartMessage.from_string(self._task_text) + elif isinstance(message, str): + message = MagenticStartMessage.from_string(message) + elif isinstance(message, ChatMessage): + message = MagenticStartMessage(task=message) + + async for event in self._workflow.run_streaming(message): + yield event + + async def run_with_string(self, task_text: str) -> WorkflowRunResult: + """Run the workflow with a task string and return all events. + + Args: + task_text: The task description as a string. + + Returns: + WorkflowRunResult: All events generated during the workflow execution. + """ + events: list[WorkflowEvent] = [] + async for event in self.run_streaming_with_string(task_text): + events.append(event) + return WorkflowRunResult(events) + + async def run_with_message(self, task_message: ChatMessage) -> WorkflowRunResult: + """Run the workflow with a ChatMessage and return all events. + + Args: + task_message: The task as a ChatMessage. + + Returns: + WorkflowRunResult: All events generated during the workflow execution. + """ + events: list[WorkflowEvent] = [] + async for event in self.run_streaming_with_message(task_message): + events.append(event) + return WorkflowRunResult(events) + + async def run(self, message: Any | None = None) -> WorkflowRunResult: + """Run the workflow and return all events. + + Args: + message: The message to send. If None and task_text was provided during construction, + uses the preset task string. + + Returns: + WorkflowRunResult: All events generated during the workflow execution. + """ + events: list[WorkflowEvent] = [] + async for event in self.run_streaming(message): + events.append(event) + return WorkflowRunResult(events) + + async def send_responses_streaming(self, responses: dict[str, Any]) -> AsyncIterable[WorkflowEvent]: + """Forward responses to pending requests and stream resulting events. + + This delegates to the underlying Workflow implementation. + """ + async for event in self._workflow.send_responses_streaming(responses): + yield event + + async def send_responses(self, responses: dict[str, Any]) -> WorkflowRunResult: + """Forward responses to pending requests and return all resulting events. + + This delegates to the underlying Workflow implementation. + """ + return await self._workflow.send_responses(responses) + + def __getattr__(self, name: str) -> Any: + """Delegate unknown attributes to the underlying workflow.""" + return getattr(self._workflow, name) + + +# endregion Magentic Workflow diff --git a/python/packages/workflow/tests/test_magentic.py b/python/packages/workflow/tests/test_magentic.py new file mode 100644 index 0000000000..b8c501dc86 --- /dev/null +++ b/python/packages/workflow/tests/test_magentic.py @@ -0,0 +1,451 @@ +# Copyright (c) Microsoft. All rights reserved. + +from collections.abc import AsyncIterable +from dataclasses import dataclass +from typing import Any + +import pytest +from agent_framework import ( + AgentRunResponse, + AgentRunResponseUpdate, + ChatMessage, + ChatResponse, + ChatResponseUpdate, + ChatRole, + TextContent, +) +from agent_framework._agents import AgentBase +from agent_framework._clients import ChatClient as AFChatClient + +from agent_framework_workflow import ( + Executor, + MagenticBuilder, + MagenticManagerBase, + MagenticPlanReviewDecision, + MagenticPlanReviewReply, + MagenticPlanReviewRequest, + MagenticProgressLedger, + MagenticProgressLedgerItem, + RequestInfoEvent, + WorkflowCompletedEvent, + WorkflowContext, + WorkflowEvent, # type: ignore # noqa: E402 + handler, +) +from agent_framework_workflow._magentic import ( + MagenticContext, + MagenticStartMessage, +) + + +def test_magentic_start_message_from_string(): + msg = MagenticStartMessage.from_string("Do the thing") + assert isinstance(msg, MagenticStartMessage) + assert isinstance(msg.task, ChatMessage) + assert msg.task.role == ChatRole.USER + assert msg.task.text == "Do the thing" + + +def test_plan_review_request_defaults_and_reply_variants(): + req = MagenticPlanReviewRequest() # defaults provided by dataclass + assert hasattr(req, "request_id") + assert req.task_text == "" and req.facts_text == "" and req.plan_text == "" + assert isinstance(req.round_index, int) and req.round_index == 0 + + # Replies: approve, revise with comments, revise with edited text + approve = MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE) + revise_comments = MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.REVISE, comments="Tighten scope") + revise_text = MagenticPlanReviewReply( + decision=MagenticPlanReviewDecision.REVISE, + edited_plan_text="- Step 1\n- Step 2", + ) + + assert approve.decision == MagenticPlanReviewDecision.APPROVE + assert revise_comments.comments == "Tighten scope" + assert revise_text.edited_plan_text is not None and revise_text.edited_plan_text.startswith("- Step 1") + + +def test_magentic_context_reset_behavior(): + ctx = MagenticContext( + task=ChatMessage(role=ChatRole.USER, text="task"), + participant_descriptions={"Alice": "Researcher"}, + ) + # seed context state + ctx.chat_history.append(ChatMessage(role=ChatRole.ASSISTANT, text="draft")) + ctx.stall_count = 2 + prev_reset = ctx.reset_count + + ctx.reset() + + assert ctx.chat_history == [] + assert ctx.stall_count == 0 + assert ctx.reset_count == prev_reset + 1 + + +@dataclass +class _SimpleLedger: + facts: ChatMessage + plan: ChatMessage + + +class FakeManager(MagenticManagerBase): + """Deterministic manager for tests that avoids real LLM calls.""" + + task_ledger: _SimpleLedger | None = None + satisfied_after_signoff: bool = True + next_speaker_name: str = "agentA" + instruction_text: str = "Proceed with step 1" + + async def plan(self, magentic_context: MagenticContext) -> ChatMessage: + facts = ChatMessage(role=ChatRole.ASSISTANT, text="GIVEN OR VERIFIED FACTS\n- A\n") + plan = ChatMessage(role=ChatRole.ASSISTANT, text="- Do X\n- Do Y\n") + self.task_ledger = _SimpleLedger(facts=facts, plan=plan) + combined = f"Task: {magentic_context.task.text}\n\nFacts:\n{facts.text}\n\nPlan:\n{plan.text}" + return ChatMessage(role=ChatRole.ASSISTANT, text=combined, author_name="magentic_manager") + + async def replan(self, magentic_context: MagenticContext) -> ChatMessage: + facts = ChatMessage(role=ChatRole.ASSISTANT, text="GIVEN OR VERIFIED FACTS\n- A2\n") + plan = ChatMessage(role=ChatRole.ASSISTANT, text="- Do Z\n") + self.task_ledger = _SimpleLedger(facts=facts, plan=plan) + combined = f"Task: {magentic_context.task.text}\n\nFacts:\n{facts.text}\n\nPlan:\n{plan.text}" + return ChatMessage(role=ChatRole.ASSISTANT, text=combined, author_name="magentic_manager") + + async def create_progress_ledger(self, magentic_context: MagenticContext) -> MagenticProgressLedger: + is_satisfied = self.satisfied_after_signoff and len(magentic_context.chat_history) > 0 + return MagenticProgressLedger( + is_request_satisfied=MagenticProgressLedgerItem(reason="test", answer=is_satisfied), + is_in_loop=MagenticProgressLedgerItem(reason="test", answer=False), + is_progress_being_made=MagenticProgressLedgerItem(reason="test", answer=True), + next_speaker=MagenticProgressLedgerItem(reason="test", answer=self.next_speaker_name), + instruction_or_question=MagenticProgressLedgerItem(reason="test", answer=self.instruction_text), + ) + + async def prepare_final_answer(self, magentic_context: MagenticContext) -> ChatMessage: + return ChatMessage(role=ChatRole.ASSISTANT, text="FINAL", author_name="magentic_manager") + + +async def test_standard_manager_plan_and_replan_combined_ledger(): + manager = FakeManager(max_round_count=10, max_stall_count=3, max_reset_count=2) + ctx = MagenticContext( + task=ChatMessage(role=ChatRole.USER, text="demo task"), + participant_descriptions={"agentA": "Agent A"}, + ) + + first = await manager.plan(ctx.model_copy(deep=True)) + assert first.role == ChatRole.ASSISTANT and "Facts:" in first.text and "Plan:" in first.text + assert manager.task_ledger is not None + + replanned = await manager.replan(ctx.model_copy(deep=True)) + assert "A2" in replanned.text or "Do Z" in replanned.text + + +async def test_standard_manager_progress_ledger_and_fallback(): + manager = FakeManager(max_round_count=10) + ctx = MagenticContext( + task=ChatMessage(role=ChatRole.USER, text="demo"), + participant_descriptions={"agentA": "Agent A"}, + ) + + ledger = await manager.create_progress_ledger(ctx.model_copy(deep=True)) + assert isinstance(ledger, MagenticProgressLedger) + assert ledger.next_speaker.answer == "agentA" + + manager.satisfied_after_signoff = False + ledger2 = await manager.create_progress_ledger(ctx.model_copy(deep=True)) + assert ledger2.is_request_satisfied.answer is False + + +async def test_magentic_workflow_plan_review_approval_to_completion(): + manager = FakeManager(max_round_count=10) + wf = ( + MagenticBuilder() + .participants(agentA=_DummyExec("agentA")) + .with_standard_manager(manager) + .with_plan_review() + .build() + ) + + req_event: RequestInfoEvent | None = None + async for ev in wf.run_streaming("do work"): + if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticPlanReviewRequest: + req_event = ev + assert req_event is not None + + completed: WorkflowCompletedEvent | None = None + async for ev in wf.send_responses_streaming({ + req_event.request_id: MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE) + }): + if isinstance(ev, WorkflowCompletedEvent): + completed = ev + break + assert completed is not None + assert isinstance(getattr(completed, "data", None), ChatMessage) + + +async def test_magentic_plan_review_approve_with_comments_replans_and_proceeds(): + class CountingManager(FakeManager): + # Declare as a model field so assignment is allowed under Pydantic + replan_count: int = 0 + + def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def] + super().__init__(*args, **kwargs) + + async def replan(self, magentic_context: MagenticContext) -> ChatMessage: # type: ignore[override] + self.replan_count += 1 + return await super().replan(magentic_context) + + manager = CountingManager(max_round_count=10) + wf = ( + MagenticBuilder() + .participants(agentA=_DummyExec("agentA")) + .with_standard_manager(manager) + .with_plan_review() + .build() + ) + + # Wait for the initial plan review request + req_event: RequestInfoEvent | None = None + async for ev in wf.run_streaming("do work"): + if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticPlanReviewRequest: + req_event = ev + assert req_event is not None + + # Reply APPROVE with comments (no edited text). Expect one replan and no second review round. + saw_second_review = False + completed: WorkflowCompletedEvent | None = None + async for ev in wf.send_responses_streaming({ + req_event.request_id: MagenticPlanReviewReply( + decision=MagenticPlanReviewDecision.APPROVE, + comments="Looks good; consider Z", + ) + }): + if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticPlanReviewRequest: + saw_second_review = True + if isinstance(ev, WorkflowCompletedEvent): + completed = ev + break + + assert completed is not None + assert manager.replan_count >= 1 + assert saw_second_review is False + # Replan from FakeManager updates facts/plan to include A2 / Do Z + assert manager.task_ledger is not None + combined_text = (manager.task_ledger.facts.text or "") + (manager.task_ledger.plan.text or "") + assert ("A2" in combined_text) or ("Do Z" in combined_text) + + +async def test_magentic_orchestrator_round_limit_produces_partial_result(): + manager = FakeManager(max_round_count=1) + manager.satisfied_after_signoff = False + wf = MagenticBuilder().participants(agentA=_DummyExec("agentA")).with_standard_manager(manager).build() + + from agent_framework_workflow import WorkflowEvent # type: ignore + + events: list[WorkflowEvent] = [] + async for ev in wf.run_streaming("round limit test"): + events.append(ev) + if len(events) > 50: + break + + completed = next((e for e in events if isinstance(e, WorkflowCompletedEvent)), None) + assert completed is not None + data = getattr(completed, "data", None) + assert isinstance(data, ChatMessage) + assert data.role == ChatRole.ASSISTANT + + +class _DummyExec(Executor): + def __init__(self, name: str) -> None: + super().__init__(name) + + @handler + async def _noop(self, message: object, ctx: WorkflowContext[object]) -> None: # pragma: no cover - not called + pass + + +from agent_framework_workflow import StandardMagenticManager # noqa: E402 + + +class _StubChatClient(AFChatClient): + async def get_response(self, messages, **kwargs): # type: ignore[override] + return ChatResponse(messages=[ChatMessage(role=ChatRole.ASSISTANT, text="ok")]) + + def get_streaming_response(self, messages, **kwargs) -> AsyncIterable[ChatResponseUpdate]: # type: ignore[override] + async def _gen(): + if False: + yield ChatResponseUpdate() # pragma: no cover + + return _gen() + + +async def test_standard_manager_plan_and_replan_via_complete_monkeypatch(): + mgr = StandardMagenticManager(chat_client=_StubChatClient()) + + async def fake_complete_plan(messages: list[ChatMessage], **kwargs: Any) -> ChatMessage: + # Return a different response depending on call order length + if any("FACTS" in (m.text or "") for m in messages): + return ChatMessage(role=ChatRole.ASSISTANT, text="- step A\n- step B") + return ChatMessage(role=ChatRole.ASSISTANT, text="GIVEN OR VERIFIED FACTS\n- fact1") + + # First, patch to produce facts then plan + mgr._complete = fake_complete_plan # type: ignore[attr-defined] + + ctx = MagenticContext( + task=ChatMessage(role=ChatRole.USER, text="T"), + participant_descriptions={"A": "desc"}, + ) + combined = await mgr.plan(ctx.model_copy(deep=True)) + # Assert structural headings and that steps appear in the combined ledger output. + assert "We are working to address the following user request:" in combined.text + assert "Here is the plan to follow as best as possible:" in combined.text + assert any(t in combined.text for t in ("- step A", "- step B", "- step")) + + # Now replan with new outputs + async def fake_complete_replan(messages: list[ChatMessage], **kwargs: Any) -> ChatMessage: + if any("Please briefly explain" in (m.text or "") for m in messages): + return ChatMessage(role=ChatRole.ASSISTANT, text="- new step") + return ChatMessage(role=ChatRole.ASSISTANT, text="GIVEN OR VERIFIED FACTS\n- updated") + + mgr._complete = fake_complete_replan # type: ignore[attr-defined] + combined2 = await mgr.replan(ctx.model_copy(deep=True)) + assert "updated" in combined2.text or "new step" in combined2.text + + +async def test_standard_manager_progress_ledger_success_and_error(): + mgr = StandardMagenticManager(chat_client=_StubChatClient()) + ctx = MagenticContext( + task=ChatMessage(role=ChatRole.USER, text="task"), + participant_descriptions={"alice": "desc"}, + ) + + # Success path: valid JSON + async def fake_complete_ok(messages: list[ChatMessage], **kwargs: Any) -> ChatMessage: + json_text = ( + '{"is_request_satisfied": {"reason": "r", "answer": false}, ' + '"is_in_loop": {"reason": "r", "answer": false}, ' + '"is_progress_being_made": {"reason": "r", "answer": true}, ' + '"next_speaker": {"reason": "r", "answer": "alice"}, ' + '"instruction_or_question": {"reason": "r", "answer": "do"}}' + ) + return ChatMessage(role=ChatRole.ASSISTANT, text=json_text) + + mgr._complete = fake_complete_ok # type: ignore[attr-defined] + ledger = await mgr.create_progress_ledger(ctx.model_copy(deep=True)) + assert ledger.next_speaker.answer == "alice" + + # Error path: invalid JSON now raises to avoid emitting planner-oriented instructions to agents + async def fake_complete_bad(messages: list[ChatMessage], **kwargs: Any) -> ChatMessage: + return ChatMessage(role=ChatRole.ASSISTANT, text="not-json") + + mgr._complete = fake_complete_bad # type: ignore[attr-defined] + with pytest.raises(RuntimeError): + await mgr.create_progress_ledger(ctx.model_copy(deep=True)) + + +class InvokeOnceManager(MagenticManagerBase): + def __init__(self) -> None: + super().__init__(max_round_count=5, max_stall_count=3, max_reset_count=2) + self._invoked = False + + async def plan(self, magentic_context: MagenticContext) -> ChatMessage: + return ChatMessage(role=ChatRole.ASSISTANT, text="ledger") + + async def replan(self, magentic_context: MagenticContext) -> ChatMessage: + return ChatMessage(role=ChatRole.ASSISTANT, text="re-ledger") + + async def create_progress_ledger(self, magentic_context: MagenticContext) -> MagenticProgressLedger: + if not self._invoked: + # First round: ask agentA to respond + self._invoked = True + return MagenticProgressLedger( + is_request_satisfied=MagenticProgressLedgerItem(reason="r", answer=False), + is_in_loop=MagenticProgressLedgerItem(reason="r", answer=False), + is_progress_being_made=MagenticProgressLedgerItem(reason="r", answer=True), + next_speaker=MagenticProgressLedgerItem(reason="r", answer="agentA"), + instruction_or_question=MagenticProgressLedgerItem(reason="r", answer="say hi"), + ) + # Next round: mark satisfied so run can conclude + return MagenticProgressLedger( + is_request_satisfied=MagenticProgressLedgerItem(reason="r", answer=True), + is_in_loop=MagenticProgressLedgerItem(reason="r", answer=False), + is_progress_being_made=MagenticProgressLedgerItem(reason="r", answer=True), + next_speaker=MagenticProgressLedgerItem(reason="r", answer="agentA"), + instruction_or_question=MagenticProgressLedgerItem(reason="r", answer="done"), + ) + + async def prepare_final_answer(self, magentic_context: MagenticContext) -> ChatMessage: + return ChatMessage(role=ChatRole.ASSISTANT, text="final") + + +class StubThreadAgent(AgentBase): + async def run_streaming(self, messages=None, *, thread=None, **kwargs): # type: ignore[override] + yield AgentRunResponseUpdate( + contents=[TextContent(text="thread-ok")], + author_name="agentA", + role=ChatRole.ASSISTANT, + ) + + async def run(self, messages=None, *, thread=None, **kwargs): # type: ignore[override] + return AgentRunResponse(messages=[ChatMessage(role=ChatRole.ASSISTANT, text="thread-ok", author_name="agentA")]) + + +class StubAssistantsClient: + pass # class name used for branch detection + + +class StubAssistantsAgent(AgentBase): + chat_client: object | None = None # allow assignment via Pydantic field + + def __init__(self) -> None: + super().__init__() + self.chat_client = StubAssistantsClient() # type name contains 'AssistantsClient' + + async def run_streaming(self, messages=None, *, thread=None, **kwargs): # type: ignore[override] + yield AgentRunResponseUpdate( + contents=[TextContent(text="assistants-ok")], + author_name="agentA", + role=ChatRole.ASSISTANT, + ) + + async def run(self, messages=None, *, thread=None, **kwargs): # type: ignore[override] + return AgentRunResponse( + messages=[ChatMessage(role=ChatRole.ASSISTANT, text="assistants-ok", author_name="agentA")] + ) + + +async def _collect_agent_responses_setup(participant_obj: object): + captured: list[ChatMessage] = [] + + async def sink(event) -> None: # type: ignore[no-untyped-def] + from agent_framework_workflow._magentic import MagenticAgentMessageEvent + + if isinstance(event, MagenticAgentMessageEvent) and event.message is not None: + captured.append(event.message) + + wf = ( + MagenticBuilder() + .participants(agentA=participant_obj) # type: ignore[arg-type] + .with_standard_manager(InvokeOnceManager()) + .on_event(sink) # type: ignore + .build() + ) + + # Run a bounded stream to allow one invoke and then completion + events: list[WorkflowEvent] = [] + async for ev in wf.run_streaming("task"): # plan review disabled + events.append(ev) + if len(events) > 50: + break + + return captured + + +async def test_agent_executor_invoke_with_thread_chat_client(): + captured = await _collect_agent_responses_setup(StubThreadAgent()) + # Should have at least one response from agentA via MagenticAgentExecutor path + assert any((m.author_name == "agentA" and "ok" in (m.text or "")) for m in captured) + + +async def test_agent_executor_invoke_with_assistants_client_messages(): + captured = await _collect_agent_responses_setup(StubAssistantsAgent()) + assert any((m.author_name == "agentA" and "ok" in (m.text or "")) for m in captured) diff --git a/python/samples/getting_started/workflow/step_08a_magentic_workflow.py b/python/samples/getting_started/workflow/step_08a_magentic_workflow.py new file mode 100644 index 0000000000..93c54af118 --- /dev/null +++ b/python/samples/getting_started/workflow/step_08a_magentic_workflow.py @@ -0,0 +1,148 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +import logging + +from agent_framework import ChatClientAgent, HostedCodeInterpreterTool +from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient +from agent_framework_workflow import ( + MagenticAgentDeltaEvent, + MagenticAgentMessageEvent, + MagenticBuilder, + MagenticCallbackEvent, + MagenticCallbackMode, + MagenticFinalResultEvent, + MagenticOrchestratorMessageEvent, + WorkflowCompletedEvent, +) + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + +""" +Magentic Workflow (multi-agent) sample. + +This sample shows how to orchestrate multiple agents using the +MagenticBuilder: + +- ResearcherAgent (ChatClientAgent backed by an OpenAI chat client) for + finding information. +- CoderAgent (ChatClientAgent backed by OpenAI Assistants with the hosted + code interpreter tool) for analysis and computation. + +The workflow is configured with: +- A Standard Magentic manager (uses a chat client for planning and progress). +- Callbacks for final results, per-message agent responses, and streaming + token updates. + +When run, the script builds the workflow, submits a task about estimating the +energy efficiency and CO2 emissions of several ML models, streams intermediate +events to the console, and prints the final aggregated answer at completion. +""" + + +async def main() -> None: + researcher_agent = ChatClientAgent( + name="ResearcherAgent", + description="Specialist in research and information gathering", + instructions=( + "You are a Researcher. You find information without additional computation or quantitative analysis." + ), + # This agent requires the gpt-4o-search-preview model to perform web searches. + # Feel free to explore with other agents that support web search, for example, + # the `OpenAIResponseAgent` or `AzureAIAgent` with bing grounding. + chat_client=OpenAIChatClient(ai_model_id="gpt-4o-search-preview"), + ) + + coder_agent = ChatClientAgent( + name="CoderAgent", + description="A helpful assistant that writes and executes code to process and analyze data.", + instructions="You solve questions using code. Please provide detailed analysis and computation process.", + chat_client=OpenAIResponsesClient(), + tools=HostedCodeInterpreterTool(), + ) + + # Unified callback + async def on_event(event: MagenticCallbackEvent) -> None: + """ + The `on_event` callback processes events emitted by the workflow. + Events include: orchestrator messages, agent delta updates, agent messages, and final result events. + """ + nonlocal last_stream_agent_id, stream_line_open + if isinstance(event, MagenticOrchestratorMessageEvent): + print(f"\n[ORCH:{event.kind}]\n\n{getattr(event.message, 'text', '')}\n{'-' * 26}") + elif isinstance(event, MagenticAgentDeltaEvent): + if last_stream_agent_id != event.agent_id or not stream_line_open: + if stream_line_open: + print() + print(f"\n[STREAM:{event.agent_id}]: ", end="", flush=True) + last_stream_agent_id = event.agent_id + stream_line_open = True + print(event.text, end="", flush=True) + elif isinstance(event, MagenticAgentMessageEvent): + if stream_line_open: + print(" (final)") + stream_line_open = False + print() + msg = event.message + if msg is not None: + response_text = (msg.text or "").replace("\n", " ") + print(f"\n[AGENT:{event.agent_id}] {msg.role.value}\n\n{response_text}\n{'-' * 26}") + elif isinstance(event, MagenticFinalResultEvent): + print("\n" + "=" * 50) + print("FINAL RESULT:") + print("=" * 50) + if event.message is not None: + print(event.message.text) + print("=" * 50) + + print("\nBuilding Magentic Workflow...") + + # State used by on_agent_stream callback + last_stream_agent_id: str | None = None + stream_line_open: bool = False + + workflow = ( + MagenticBuilder() + .participants(researcher=researcher_agent, coder=coder_agent) + .on_event(on_event, mode=MagenticCallbackMode.STREAMING) + .with_standard_manager( + chat_client=OpenAIChatClient(), + max_round_count=10, + max_stall_count=3, + max_reset_count=2, + ) + .build() + ) + + task = ( + "I am preparing a report on the energy efficiency of different machine learning model architectures. " + "Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 " + "on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). " + "Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 " + "VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model " + "per task type (image classification, text classification, and text generation)." + ) + + print(f"\nTask: {task}") + print("\nStarting workflow execution...") + + try: + completion_event = None + async for event in workflow.run_streaming(task): + print(f"Event: {event}") + + if isinstance(event, WorkflowCompletedEvent): + completion_event = event + + if completion_event is not None: + data = getattr(completion_event, "data", None) + preview = getattr(data, "text", None) or (str(data) if data is not None else "") + print(f"Workflow completed with result:\n\n{preview}") + + except Exception as e: + print(f"Workflow execution failed: {e}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/samples/getting_started/workflow/step_08b_magentic_human_plan_update.py b/python/samples/getting_started/workflow/step_08b_magentic_human_plan_update.py new file mode 100644 index 0000000000..afc9dacf1a --- /dev/null +++ b/python/samples/getting_started/workflow/step_08b_magentic_human_plan_update.py @@ -0,0 +1,189 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +import logging +from typing import cast + +from agent_framework import ChatClientAgent, HostedCodeInterpreterTool +from agent_framework.openai import OpenAIChatClient, OpenAIResponsesClient +from agent_framework_workflow import ( + MagenticAgentDeltaEvent, + MagenticAgentMessageEvent, + MagenticBuilder, + MagenticCallbackEvent, + MagenticCallbackMode, + MagenticFinalResultEvent, + MagenticOrchestratorMessageEvent, + MagenticPlanReviewDecision, + MagenticPlanReviewReply, + MagenticPlanReviewRequest, + RequestInfoEvent, + WorkflowCompletedEvent, +) + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + +""" +Magentic workflow with human-in-the-loop plan review and update. + +This sample builds a Magentic workflow with two cooperating agents and enables +plan review so a human can approve or revise the plan before execution: + +- researcher: ChatClientAgent backed by OpenAIChatClient (web/search-capable model) +- coder: ChatClientAgent backed by OpenAIAssistantsClient with the Hosted Code Interpreter tool + +Key behaviors demonstrated: +- with_plan_review(): requests a PlanReviewRequest before coordination begins +- Event loop that waits for RequestInfoEvent[PlanReviewRequest], prints the plan, then + replies with PlanReviewReply (here we auto-approve, but you can edit/collect input) +- Callbacks: on_agent_stream (incremental chunks), on_agent_response (final messages), + on_result (final answer), and on_exception + +Prereqs: configure your OpenAI credentials in the environment so the Chat/Assistants +clients can run. You can swap clients/models as needed. +""" + + +async def main() -> None: + researcher_agent = ChatClientAgent( + name="ResearcherAgent", + description="Specialist in research and information gathering", + instructions=( + "You are a Researcher. You find information without additional computation or quantitative analysis." + ), + # This agent requires the gpt-4o-search-preview model to perform web searches. + # Feel free to explore with other agents that support web search, for example, + # the `OpenAIResponseAgent` or `AzureAIAgent` with bing grounding. + chat_client=OpenAIChatClient(ai_model_id="gpt-4o-search-preview"), + ) + + coder_agent = ChatClientAgent( + name="CoderAgent", + description="A helpful assistant that writes and executes code to process and analyze data.", + instructions="You solve questions using code. Please provide detailed analysis and computation process.", + chat_client=OpenAIResponsesClient(), + tools=HostedCodeInterpreterTool(), + ) + + # Callbacks + def on_exception(exception: Exception) -> None: + print(f"Exception occurred: {exception}") + logger.exception("Workflow exception", exc_info=exception) + + # Unified callback + async def on_event(event: MagenticCallbackEvent) -> None: + nonlocal last_stream_agent_id, stream_line_open + if isinstance(event, MagenticOrchestratorMessageEvent): + print(f"\n[ORCH:{event.kind}]\n\n{getattr(event.message, 'text', '')}\n{'-' * 26}") + elif isinstance(event, MagenticAgentDeltaEvent): + if last_stream_agent_id != event.agent_id or not stream_line_open: + if stream_line_open: + print() + print(f"\n[STREAM:{event.agent_id}]: ", end="", flush=True) + last_stream_agent_id = event.agent_id + stream_line_open = True + print(event.text, end="", flush=True) + elif isinstance(event, MagenticAgentMessageEvent): + if stream_line_open: + print(" (final)") + stream_line_open = False + print() + msg = event.message + if msg is not None: + response_text = (msg.text or "").replace("\n", " ") + print(f"\n[AGENT:{event.agent_id}] {msg.role.value}\n\n{response_text}\n{'-' * 26}") + elif isinstance(event, MagenticFinalResultEvent): + print("\n" + "=" * 50) + print("FINAL RESULT:") + print("=" * 50) + if event.message is not None: + print(event.message.text) + print("=" * 50) + + print("\nBuilding Magentic Workflow...") + + last_stream_agent_id: str | None = None + stream_line_open: bool = False + + workflow = ( + MagenticBuilder() + .participants(researcher=researcher_agent, coder=coder_agent) + .on_exception(on_exception) + .on_event(on_event, mode=MagenticCallbackMode.STREAMING) + .with_standard_manager( + chat_client=OpenAIChatClient(), + max_round_count=10, + max_stall_count=3, + max_reset_count=2, + ) + .with_plan_review() + .build() + ) + + task = ( + "I am preparing a report on the energy efficiency of different machine learning model architectures. " + "Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 " + "on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). " + "Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 " + "VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model " + "per task type (image classification, text classification, and text generation)." + ) + + print(f"\nTask: {task}") + print("\nStarting workflow execution...") + + try: + completion_event: WorkflowCompletedEvent | None = None + pending_request: RequestInfoEvent | None = None + + while True: + # Phase 1: run until either completion or a HIL request + if pending_request is None: + async for event in workflow.run_streaming(task): + print(f"Event: {event}") + + if isinstance(event, WorkflowCompletedEvent): + completion_event = event + + if isinstance(event, RequestInfoEvent) and event.request_type is MagenticPlanReviewRequest: + pending_request = event + review_req = cast(MagenticPlanReviewRequest, event.data) + if review_req.plan_text: + print(f"\n=== PLAN REVIEW REQUEST ===\n{review_req.plan_text}\n") + + # Break if completed + if completion_event is not None: + data = getattr(completion_event, "data", None) + preview = getattr(data, "text", None) or (str(data) if data is not None else "") + print(f"Workflow completed with result:\n\n{preview}") + + # Phase 2: respond to the pending plan review (HIL) request + if pending_request is not None: + # For demo purposes we approve as-is. Replace this with UI input + # to collect a human decision/comments/edited plan. + reply = MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE) + + async for event in workflow.send_responses_streaming({pending_request.request_id: reply}): + print(f"Event: {event}") + + if isinstance(event, WorkflowCompletedEvent): + completion_event = event + + if isinstance(event, RequestInfoEvent) and event.request_type is MagenticPlanReviewRequest: + # Another review cycle requested; keep pending + pending_request = event + review_req = cast(MagenticPlanReviewRequest, event.data) + if review_req.plan_text: + print(f"\n=== PLAN REVIEW REQUEST ===\n{review_req.plan_text}\n") + else: + # Clear pending if no immediate new request + pending_request = None + + except Exception as e: + print(f"Workflow execution failed: {e}") + on_exception(e) + + +if __name__ == "__main__": + asyncio.run(main())