mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
93cbf6b3f0
* Parse structuredContent from MCP CallToolResult (#3313) The _parse_tool_result_from_mcp method only iterated over the content field from CallToolResult, ignoring the structuredContent field entirely. MCP servers that return JSON data via structuredContent (e.g., Power BI MCP) appeared to return None. Add handling for structuredContent: when present, serialize it as JSON text and append it to the result list. This preserves the data for the LLM while maintaining backward compatibility with existing behavior. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Python: Parse MCP CallToolResult.structuredContent field to prevent tool results returning None Fixes #3313 * Address review feedback: add default=str to json.dumps and remove .checkpoints/ - Add default=str to json.dumps for structuredContent serialization so non-JSON-serializable values (e.g. bytes) degrade gracefully instead of raising TypeError - Remove all .checkpoints/ runtime artifacts from the repository - Add **/.checkpoints/ to .gitignore to prevent future accidental commits - Add test for non-serializable structuredContent values Fixes #3313 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Address review feedback for #3313: Python: MCP CallToolResult.structuredContent field is not parsed, causing tool results to return None --------- Co-authored-by: Copilot <copilot@github.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
342 lines
12 KiB
Python
342 lines
12 KiB
Python
# Copyright (c) Microsoft. All rights reserved.
|
|
|
|
"""Agent runner orchestration for the harness console.
|
|
|
|
This module provides the HarnessAgentRunner class, which orchestrates agent
|
|
invocations with observer lifecycle management. It handles:
|
|
- User input dispatch
|
|
- Agent streaming with observer notifications
|
|
- Follow-up action collection
|
|
- Streaming state management
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from typing import TYPE_CHECKING
|
|
|
|
if TYPE_CHECKING:
|
|
from agent_framework import Agent, AgentSession
|
|
|
|
from .app_state import FollowUpAction
|
|
from .observers.base import ConsoleObserver
|
|
from .state_driver import IUXStateDriver
|
|
|
|
|
|
class HarnessAgentRunner:
|
|
"""Orchestrates agent invocations driven by user-input events from the UI.
|
|
|
|
The component invokes the runner's input handlers (run_turn) directly;
|
|
the runner mutates UI state through the supplied IUXStateDriver.
|
|
|
|
This is a minimal implementation focusing on the core agent loop without
|
|
command handling or complex message injection (those can be added later).
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
agent: Agent,
|
|
observers: list[ConsoleObserver],
|
|
state_driver: IUXStateDriver,
|
|
*,
|
|
max_context_window_tokens: int | None = None,
|
|
max_output_tokens: int | None = None,
|
|
) -> None:
|
|
"""Initialize the agent runner.
|
|
|
|
Args:
|
|
agent: The agent to orchestrate.
|
|
observers: List of console observers for lifecycle events.
|
|
state_driver: The UI state driver for observer updates.
|
|
max_context_window_tokens: Optional max context window size for usage display.
|
|
max_output_tokens: Optional max output tokens for usage display.
|
|
"""
|
|
self._agent = agent
|
|
self._observers = observers
|
|
self._ux = state_driver
|
|
self._max_context_window_tokens = max_context_window_tokens
|
|
self._max_output_tokens = max_output_tokens
|
|
self._input_gate = asyncio.Semaphore(1) # Single turn at a time
|
|
|
|
async def run_turn(
|
|
self,
|
|
user_input: str,
|
|
session: AgentSession | None = None,
|
|
) -> None:
|
|
"""Run a single agent turn with the given user input.
|
|
|
|
Echoes the input, then delegates to the agent loop.
|
|
|
|
Args:
|
|
user_input: The user's input text.
|
|
session: Optional agent session for conversation history.
|
|
"""
|
|
async with self._input_gate:
|
|
self._ux.write_user_input_echo(user_input)
|
|
|
|
from agent_framework import Message
|
|
|
|
messages = [Message(role="user", contents=[user_input])]
|
|
await self._run_agent_loop(messages, session)
|
|
|
|
async def start_agent_turn(
|
|
self,
|
|
messages: list,
|
|
session: AgentSession | None = None,
|
|
) -> None:
|
|
"""Resume the agent loop with pre-built messages (from follow-up responses).
|
|
|
|
Called by the app after the user finishes answering follow-up questions.
|
|
If messages is empty, just completes the turn.
|
|
|
|
Args:
|
|
messages: List of Message objects to send to the agent.
|
|
session: Optional agent session.
|
|
"""
|
|
async with self._input_gate:
|
|
if not messages:
|
|
self._complete_turn()
|
|
return
|
|
await self._run_agent_loop(messages, session)
|
|
|
|
async def _run_agent_loop(
|
|
self,
|
|
messages: list,
|
|
session: AgentSession | None,
|
|
) -> None:
|
|
"""Run the agent loop, re-invoking as needed for follow-up messages.
|
|
|
|
Loops while there are messages to send. After each stream:
|
|
- Collects follow-up actions from observers
|
|
- If questions exist → queue them and return (UI will collect answers)
|
|
- If only direct messages → loop with those messages
|
|
- If nothing → complete the turn
|
|
|
|
Args:
|
|
messages: Initial messages to send.
|
|
session: Optional agent session.
|
|
"""
|
|
next_messages = messages
|
|
|
|
while next_messages:
|
|
# Configure run options
|
|
options = self._configure_run_options(session)
|
|
|
|
# Begin streaming
|
|
self._ux.begin_streaming()
|
|
self._ux.begin_streaming_output()
|
|
self._ux.set_show_spinner(True)
|
|
|
|
try:
|
|
await self._stream_response_messages(next_messages, session, options)
|
|
except Exception as ex:
|
|
self._ux.append_info_line(
|
|
f"❌ Stream error: {ex.__class__.__name__}:\n{ex}",
|
|
color="red",
|
|
)
|
|
|
|
# Stop spinner and end streaming output
|
|
self._ux.set_show_spinner(False)
|
|
|
|
# Collect follow-up actions from observers
|
|
follow_up_actions = await self._collect_follow_up_actions(session)
|
|
|
|
# Separate direct messages from questions
|
|
has_follow_ups = len(follow_up_actions) > 0
|
|
|
|
# Write no-text warning if applicable
|
|
await self._ux.write_no_text_warning(has_follow_ups)
|
|
|
|
# Enqueue all follow-up actions
|
|
for action in follow_up_actions:
|
|
self._ux.enqueue_follow_up_action(action)
|
|
|
|
# Check if there are pending questions (UI needs user input)
|
|
if self._ux.has_pending_questions():
|
|
# Pause — the UI will collect answers and call start_agent_turn
|
|
return
|
|
|
|
# No questions — drain any accumulated direct messages and loop
|
|
drained = self._ux.take_follow_up_responses()
|
|
next_messages = drained if drained else None
|
|
|
|
self._complete_turn()
|
|
|
|
def _complete_turn(self) -> None:
|
|
"""Complete the current turn (end streaming)."""
|
|
self._ux.end_streaming()
|
|
|
|
def _configure_run_options(
|
|
self,
|
|
session: AgentSession | None,
|
|
) -> dict:
|
|
"""Configure run options via observers.
|
|
|
|
Each observer can modify the options dict to influence agent behavior.
|
|
|
|
Args:
|
|
session: Optional agent session.
|
|
|
|
Returns:
|
|
Options dict for agent.run().
|
|
"""
|
|
options = {}
|
|
for observer in self._observers:
|
|
observer.configure_run_options(options, self._agent, session)
|
|
return options
|
|
|
|
async def _stream_response(
|
|
self,
|
|
user_input: str,
|
|
session: AgentSession | None,
|
|
options: dict,
|
|
) -> None:
|
|
"""Stream agent response from a text input and dispatch to observers.
|
|
|
|
Args:
|
|
user_input: The user's input text.
|
|
session: Optional agent session.
|
|
options: Run options configured by observers.
|
|
"""
|
|
# Stream response using agent.run(stream=True)
|
|
stream = self._agent.run(
|
|
user_input,
|
|
stream=True,
|
|
session=session,
|
|
options=options,
|
|
)
|
|
|
|
# Process each update chunk
|
|
async for update in stream:
|
|
await self._dispatch_update(update, session)
|
|
|
|
# Extract usage from the final response
|
|
self._extract_usage(stream)
|
|
|
|
async def _stream_response_messages(
|
|
self,
|
|
messages: list,
|
|
session: AgentSession | None,
|
|
options: dict,
|
|
) -> None:
|
|
"""Stream agent response from Message objects and dispatch to observers.
|
|
|
|
Args:
|
|
messages: List of Message objects to send.
|
|
session: Optional agent session.
|
|
options: Run options configured by observers.
|
|
"""
|
|
stream = self._agent.run(
|
|
messages,
|
|
stream=True,
|
|
session=session,
|
|
options=options,
|
|
)
|
|
|
|
async for update in stream:
|
|
await self._dispatch_update(update, session)
|
|
|
|
self._extract_usage(stream)
|
|
|
|
def _extract_usage(self, stream) -> None:
|
|
"""Extract token usage from a completed stream."""
|
|
try:
|
|
get_final = getattr(stream, "get_final_response", None)
|
|
if not get_final:
|
|
return
|
|
|
|
import inspect
|
|
|
|
if inspect.iscoroutinefunction(get_final):
|
|
return
|
|
|
|
final_response = get_final()
|
|
if final_response is None:
|
|
return
|
|
|
|
usage = getattr(final_response, "usage_details", None)
|
|
if not isinstance(usage, dict):
|
|
return
|
|
|
|
input_tokens = usage.get("input_token_count", 0) or 0
|
|
output_tokens = usage.get("output_token_count", 0) or 0
|
|
if input_tokens or output_tokens:
|
|
self._ux.set_usage_text(self._format_usage(input_tokens, output_tokens))
|
|
except (AttributeError, TypeError):
|
|
pass
|
|
|
|
async def _dispatch_update(
|
|
self,
|
|
update, # AgentResponseUpdate
|
|
session: AgentSession | None,
|
|
) -> None:
|
|
"""Dispatch a single update to all observers.
|
|
|
|
Calls observer lifecycle methods in order:
|
|
1. on_response_update (once per update)
|
|
2. on_content (for each content item)
|
|
3. on_text (if text is present)
|
|
|
|
Args:
|
|
update: The agent response update.
|
|
session: Optional agent session.
|
|
"""
|
|
# on_response_update
|
|
for observer in self._observers:
|
|
await observer.on_response_update(self._ux, update, self._agent, session)
|
|
|
|
# on_content for each content item
|
|
if hasattr(update, "contents") and update.contents:
|
|
for content in update.contents:
|
|
for observer in self._observers:
|
|
await observer.on_content(self._ux, content, self._agent, session)
|
|
|
|
# on_text for text chunks
|
|
if hasattr(update, "text") and update.text:
|
|
for observer in self._observers:
|
|
await observer.on_text(self._ux, update.text, self._agent, session)
|
|
|
|
async def _collect_follow_up_actions(
|
|
self,
|
|
session: AgentSession | None,
|
|
) -> list[FollowUpAction]:
|
|
"""Collect follow-up actions from all observers.
|
|
|
|
Called after streaming completes to gather any follow-up questions
|
|
or messages from observers.
|
|
|
|
Args:
|
|
session: Optional agent session.
|
|
|
|
Returns:
|
|
List of follow-up actions from all observers.
|
|
"""
|
|
actions: list[FollowUpAction] = []
|
|
for observer in self._observers:
|
|
observer_actions = await observer.on_stream_complete(self._ux, self._agent, session)
|
|
if observer_actions:
|
|
actions.extend(observer_actions)
|
|
return actions
|
|
|
|
def _format_usage(self, input_tokens: int, output_tokens: int) -> str:
|
|
"""Format token counts matching C# harness style: 📊 Tokens — input: X | output: Y | total: Z."""
|
|
total_tokens = input_tokens + output_tokens
|
|
|
|
input_budget = None
|
|
if self._max_context_window_tokens and self._max_output_tokens:
|
|
input_budget = self._max_context_window_tokens - self._max_output_tokens
|
|
|
|
return (
|
|
f"📊 Tokens — input: {self._format_token_count(input_tokens, input_budget)}"
|
|
f" | output: {self._format_token_count(output_tokens, self._max_output_tokens)}"
|
|
f" | total: {self._format_token_count(total_tokens, self._max_context_window_tokens)}"
|
|
)
|
|
|
|
@staticmethod
|
|
def _format_token_count(count: int, budget: int | None) -> str:
|
|
"""Format a token count, optionally showing budget percentage."""
|
|
if budget and budget > 0:
|
|
pct = count / budget * 100
|
|
return f"{count:,}/{budget:,} ({pct:.1f}%)"
|
|
return f"{count:,}"
|