Files
Giles Odigwe 93cbf6b3f0 Python: Parse MCP CallToolResult.structuredContent field to prevent tool results returning None (#6421)
* Parse structuredContent from MCP CallToolResult (#3313)

The _parse_tool_result_from_mcp method only iterated over the content
field from CallToolResult, ignoring the structuredContent field entirely.
MCP servers that return JSON data via structuredContent (e.g., Power BI
MCP) appeared to return None.

Add handling for structuredContent: when present, serialize it as JSON
text and append it to the result list. This preserves the data for the
LLM while maintaining backward compatibility with existing behavior.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Python: Parse MCP CallToolResult.structuredContent field to prevent tool results returning None

Fixes #3313

* Address review feedback: add default=str to json.dumps and remove .checkpoints/

- Add default=str to json.dumps for structuredContent serialization so
  non-JSON-serializable values (e.g. bytes) degrade gracefully instead
  of raising TypeError
- Remove all .checkpoints/ runtime artifacts from the repository
- Add **/.checkpoints/ to .gitignore to prevent future accidental commits
- Add test for non-serializable structuredContent values

Fixes #3313

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Address review feedback for #3313: Python: MCP CallToolResult.structuredContent field is not parsed, causing tool results to return None

---------

Co-authored-by: Copilot <copilot@github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-06-10 12:51:09 +00:00

342 lines
12 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
"""Agent runner orchestration for the harness console.
This module provides the HarnessAgentRunner class, which orchestrates agent
invocations with observer lifecycle management. It handles:
- User input dispatch
- Agent streaming with observer notifications
- Follow-up action collection
- Streaming state management
"""
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from agent_framework import Agent, AgentSession
from .app_state import FollowUpAction
from .observers.base import ConsoleObserver
from .state_driver import IUXStateDriver
class HarnessAgentRunner:
"""Orchestrates agent invocations driven by user-input events from the UI.
The component invokes the runner's input handlers (run_turn) directly;
the runner mutates UI state through the supplied IUXStateDriver.
This is a minimal implementation focusing on the core agent loop without
command handling or complex message injection (those can be added later).
"""
def __init__(
self,
agent: Agent,
observers: list[ConsoleObserver],
state_driver: IUXStateDriver,
*,
max_context_window_tokens: int | None = None,
max_output_tokens: int | None = None,
) -> None:
"""Initialize the agent runner.
Args:
agent: The agent to orchestrate.
observers: List of console observers for lifecycle events.
state_driver: The UI state driver for observer updates.
max_context_window_tokens: Optional max context window size for usage display.
max_output_tokens: Optional max output tokens for usage display.
"""
self._agent = agent
self._observers = observers
self._ux = state_driver
self._max_context_window_tokens = max_context_window_tokens
self._max_output_tokens = max_output_tokens
self._input_gate = asyncio.Semaphore(1) # Single turn at a time
async def run_turn(
self,
user_input: str,
session: AgentSession | None = None,
) -> None:
"""Run a single agent turn with the given user input.
Echoes the input, then delegates to the agent loop.
Args:
user_input: The user's input text.
session: Optional agent session for conversation history.
"""
async with self._input_gate:
self._ux.write_user_input_echo(user_input)
from agent_framework import Message
messages = [Message(role="user", contents=[user_input])]
await self._run_agent_loop(messages, session)
async def start_agent_turn(
self,
messages: list,
session: AgentSession | None = None,
) -> None:
"""Resume the agent loop with pre-built messages (from follow-up responses).
Called by the app after the user finishes answering follow-up questions.
If messages is empty, just completes the turn.
Args:
messages: List of Message objects to send to the agent.
session: Optional agent session.
"""
async with self._input_gate:
if not messages:
self._complete_turn()
return
await self._run_agent_loop(messages, session)
async def _run_agent_loop(
self,
messages: list,
session: AgentSession | None,
) -> None:
"""Run the agent loop, re-invoking as needed for follow-up messages.
Loops while there are messages to send. After each stream:
- Collects follow-up actions from observers
- If questions exist → queue them and return (UI will collect answers)
- If only direct messages → loop with those messages
- If nothing → complete the turn
Args:
messages: Initial messages to send.
session: Optional agent session.
"""
next_messages = messages
while next_messages:
# Configure run options
options = self._configure_run_options(session)
# Begin streaming
self._ux.begin_streaming()
self._ux.begin_streaming_output()
self._ux.set_show_spinner(True)
try:
await self._stream_response_messages(next_messages, session, options)
except Exception as ex:
self._ux.append_info_line(
f"❌ Stream error: {ex.__class__.__name__}:\n{ex}",
color="red",
)
# Stop spinner and end streaming output
self._ux.set_show_spinner(False)
# Collect follow-up actions from observers
follow_up_actions = await self._collect_follow_up_actions(session)
# Separate direct messages from questions
has_follow_ups = len(follow_up_actions) > 0
# Write no-text warning if applicable
await self._ux.write_no_text_warning(has_follow_ups)
# Enqueue all follow-up actions
for action in follow_up_actions:
self._ux.enqueue_follow_up_action(action)
# Check if there are pending questions (UI needs user input)
if self._ux.has_pending_questions():
# Pause — the UI will collect answers and call start_agent_turn
return
# No questions — drain any accumulated direct messages and loop
drained = self._ux.take_follow_up_responses()
next_messages = drained if drained else None
self._complete_turn()
def _complete_turn(self) -> None:
"""Complete the current turn (end streaming)."""
self._ux.end_streaming()
def _configure_run_options(
self,
session: AgentSession | None,
) -> dict:
"""Configure run options via observers.
Each observer can modify the options dict to influence agent behavior.
Args:
session: Optional agent session.
Returns:
Options dict for agent.run().
"""
options = {}
for observer in self._observers:
observer.configure_run_options(options, self._agent, session)
return options
async def _stream_response(
self,
user_input: str,
session: AgentSession | None,
options: dict,
) -> None:
"""Stream agent response from a text input and dispatch to observers.
Args:
user_input: The user's input text.
session: Optional agent session.
options: Run options configured by observers.
"""
# Stream response using agent.run(stream=True)
stream = self._agent.run(
user_input,
stream=True,
session=session,
options=options,
)
# Process each update chunk
async for update in stream:
await self._dispatch_update(update, session)
# Extract usage from the final response
self._extract_usage(stream)
async def _stream_response_messages(
self,
messages: list,
session: AgentSession | None,
options: dict,
) -> None:
"""Stream agent response from Message objects and dispatch to observers.
Args:
messages: List of Message objects to send.
session: Optional agent session.
options: Run options configured by observers.
"""
stream = self._agent.run(
messages,
stream=True,
session=session,
options=options,
)
async for update in stream:
await self._dispatch_update(update, session)
self._extract_usage(stream)
def _extract_usage(self, stream) -> None:
"""Extract token usage from a completed stream."""
try:
get_final = getattr(stream, "get_final_response", None)
if not get_final:
return
import inspect
if inspect.iscoroutinefunction(get_final):
return
final_response = get_final()
if final_response is None:
return
usage = getattr(final_response, "usage_details", None)
if not isinstance(usage, dict):
return
input_tokens = usage.get("input_token_count", 0) or 0
output_tokens = usage.get("output_token_count", 0) or 0
if input_tokens or output_tokens:
self._ux.set_usage_text(self._format_usage(input_tokens, output_tokens))
except (AttributeError, TypeError):
pass
async def _dispatch_update(
self,
update, # AgentResponseUpdate
session: AgentSession | None,
) -> None:
"""Dispatch a single update to all observers.
Calls observer lifecycle methods in order:
1. on_response_update (once per update)
2. on_content (for each content item)
3. on_text (if text is present)
Args:
update: The agent response update.
session: Optional agent session.
"""
# on_response_update
for observer in self._observers:
await observer.on_response_update(self._ux, update, self._agent, session)
# on_content for each content item
if hasattr(update, "contents") and update.contents:
for content in update.contents:
for observer in self._observers:
await observer.on_content(self._ux, content, self._agent, session)
# on_text for text chunks
if hasattr(update, "text") and update.text:
for observer in self._observers:
await observer.on_text(self._ux, update.text, self._agent, session)
async def _collect_follow_up_actions(
self,
session: AgentSession | None,
) -> list[FollowUpAction]:
"""Collect follow-up actions from all observers.
Called after streaming completes to gather any follow-up questions
or messages from observers.
Args:
session: Optional agent session.
Returns:
List of follow-up actions from all observers.
"""
actions: list[FollowUpAction] = []
for observer in self._observers:
observer_actions = await observer.on_stream_complete(self._ux, self._agent, session)
if observer_actions:
actions.extend(observer_actions)
return actions
def _format_usage(self, input_tokens: int, output_tokens: int) -> str:
"""Format token counts matching C# harness style: 📊 Tokens — input: X | output: Y | total: Z."""
total_tokens = input_tokens + output_tokens
input_budget = None
if self._max_context_window_tokens and self._max_output_tokens:
input_budget = self._max_context_window_tokens - self._max_output_tokens
return (
f"📊 Tokens — input: {self._format_token_count(input_tokens, input_budget)}"
f" | output: {self._format_token_count(output_tokens, self._max_output_tokens)}"
f" | total: {self._format_token_count(total_tokens, self._max_context_window_tokens)}"
)
@staticmethod
def _format_token_count(count: int, budget: int | None) -> str:
"""Format a token count, optionally showing budget percentage."""
if budget and budget > 0:
pct = count / budget * 100
return f"{count:,}/{budget:,} ({pct:.1f}%)"
return f"{count:,}"