Files
westey bad05a2bdc Python: Harness console for python (#6312)
* Add initial harness console for python

* Add textual to project

* Add planning and approval flows with list selector

* Address PR comments

* Fix list selection bug

* Fix PR #6312 round 2 review comments

- Escape untrusted agent text with rich.markup.escape() in observers
  (text_output, planning_output, reasoning_display) to prevent markup injection
- Remove non-functional 'Always approve' choices from tool_approval.py
  (framework lacks CreateAlwaysApproveToolResponse support)
- Remove textual from root pyproject.toml dev deps (sample-specific)
- Add PEP 723 inline script metadata to harness_research.py
- Narrow except Exception to except NoMatches in list_selection.py

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix build error

* Fix build errors

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-06-09 05:48:35 +00:00

504 lines
16 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
"""Tool call formatters for displaying function calls in the harness console.
This module provides formatters that convert raw function call content into
human-readable display strings. Each formatter handles specific tool patterns
(e.g., web_search, todos_*, etc.) and the FallbackToolFormatter provides
generic formatting for any unmatched tools.
Usage:
from harness.console.formatters import build_default_formatters, format_tool_call
from agent_framework import Content
call = Content.from_function_call(
call_id="call_1",
name="web_search",
arguments={"query": "Python async"}
)
formatters = build_default_formatters()
result = format_tool_call(formatters, call) # "web_search (Python async)"
"""
from __future__ import annotations
import contextlib
import json
from abc import ABC, abstractmethod
from typing import Any
from agent_framework import Content
# region Helper Functions
def get_argument_value(call: Content, param_name: str) -> Any:
"""Extract an argument value from a function call.
Handles both dict and JSON string arguments.
Args:
call: The function call content.
param_name: The parameter name to extract.
Returns:
The argument value, or None if not found.
"""
if call.arguments is None:
return None
if isinstance(call.arguments, str):
# arguments is a JSON string, parse it
try:
args_dict = json.loads(call.arguments)
except (json.JSONDecodeError, TypeError):
return None
if not isinstance(args_dict, dict):
return None
elif isinstance(call.arguments, dict):
args_dict = call.arguments
else:
return None
return args_dict.get(param_name)
def as_int_list(value: Any) -> list[int] | None:
"""Convert a value to a list of integers, or None if not possible.
Args:
value: The value to convert (should be a list).
Returns:
A list of integers, or None if conversion fails.
"""
if not isinstance(value, list):
return None
result: list[int] = []
for item in value:
if isinstance(item, int):
result.append(item)
else:
with contextlib.suppress(ValueError, TypeError):
result.append(int(item))
return result if result else None
def as_dict_list(value: Any) -> list[dict[str, Any]] | None:
"""Convert a value to a list of dicts, or None if not possible.
Args:
value: The value to convert (should be a list).
Returns:
A list of dicts, or None if value is not a list of dicts.
"""
if not isinstance(value, list):
return None
result: list[dict[str, Any]] = []
for item in value:
if isinstance(item, dict):
result.append(item)
return result if result else None
def truncate(text: str, max_length: int) -> str:
"""Truncate a string to the specified maximum length, appending an ellipsis if truncated.
Args:
text: The text to truncate.
max_length: The maximum length.
Returns:
The truncated string.
"""
return text if len(text) <= max_length else text[:max_length] + ""
# endregion
# region Base Class
class ToolCallFormatter(ABC):
"""Base class for tool call formatters that produce human-readable display strings
for function call content items shown in the console.
"""
@abstractmethod
def can_format(self, call: Content) -> bool:
"""Return True if this formatter can handle the given function call.
Args:
call: The function call content to check.
Returns:
True if this formatter should be used; otherwise False.
"""
...
@abstractmethod
def format_detail(self, call: Content) -> str | None:
"""Return the detail portion of the formatted output for the given tool call,
or None if only the tool name should be displayed.
Args:
call: The function call content to format.
Returns:
A detail string to append after the tool name, or None.
"""
...
# endregion
# region Concrete Formatters
class FallbackToolFormatter(ToolCallFormatter):
"""Catch-all formatter that handles any tool not matched by a more specific formatter.
Displays a generic summary of the tool's arguments. This formatter should always be
placed last in the formatter list.
"""
def can_format(self, call: Content) -> bool:
"""Always returns True - this formatter matches everything."""
return True
def format_detail(self, call: Content) -> str | None:
"""Format arguments as generic (key: value, ...) pairs."""
if call.arguments is None:
return None
# Parse arguments
if isinstance(call.arguments, str):
try:
args_dict = json.loads(call.arguments)
except (json.JSONDecodeError, TypeError):
return None
elif isinstance(call.arguments, dict):
args_dict = call.arguments
else:
return None
if not args_dict:
return None
# Build argument list
parts: list[str] = []
for key, value in args_dict.items():
if value is None:
continue
# Convert value to string
if isinstance(value, bool):
str_value = "true" if value else "false"
elif isinstance(value, (int, float)):
str_value = str(value)
elif isinstance(value, str):
str_value = value
else:
# Complex types - skip for now
continue
parts.append(f"{key}: {truncate(str_value, 40)}")
return f"({', '.join(parts)})" if parts else None
class WebSearchToolFormatter(ToolCallFormatter):
"""Formats web_search tool calls, showing the search query."""
def can_format(self, call: Content) -> bool:
"""Match web_search tool calls."""
return call.name == "web_search"
def format_detail(self, call: Content) -> str | None:
"""Extract and format the query parameter."""
value = get_argument_value(call, "query")
return f"({value})" if value else None
class TodoToolFormatter(ToolCallFormatter):
"""Formats todos_* tool calls with tree-view output for added items
and structured output for complete/remove operations.
"""
def can_format(self, call: Content) -> bool:
"""Match todos_* tool calls."""
return call.name is not None and call.name.startswith("todos_")
def format_detail(self, call: Content) -> str | None:
"""Format based on the specific todos operation."""
if call.name == "todos_add":
return self._format_add_todos(call)
if call.name == "todos_complete":
return self._format_complete_todos(call)
if call.name == "todos_remove":
return self._format_id_list(call, "ids", "Remove")
return None
def _format_add_todos(self, call: Content) -> str | None:
"""Format todos_add with tree view of titles."""
todos = as_dict_list(get_argument_value(call, "todos"))
if not todos:
return None
titles: list[str] = []
for todo in todos:
title = todo.get("title")
if title and isinstance(title, str):
titles.append(title)
if not titles:
return None
# Build tree view
count = len(titles)
plural = "s" if count != 1 else ""
lines = [f"({count} item{plural})"]
for i, title in enumerate(titles):
connector = "├─" if i < count - 1 else "└─"
lines.append(f"\n {connector} {title}")
return "".join(lines)
def _format_complete_todos(self, call: Content) -> str | None:
"""Format todos_complete with tree view of IDs and reasons."""
items = as_dict_list(get_argument_value(call, "items"))
if not items:
return None
entries: list[tuple[int, str | None]] = []
for item in items:
todo_id = item.get("id")
if not isinstance(todo_id, int):
continue
reason = item.get("reason")
reason_str = str(reason) if reason is not None and not isinstance(reason, str) else reason
entries.append((todo_id, reason_str))
if not entries:
return None
# Build tree view
lines: list[str] = []
for i, (todo_id, reason) in enumerate(entries):
connector = "├─" if i < len(entries) - 1 else "└─"
line = f"\n {connector} Complete #{todo_id}"
if reason:
line += f"{truncate(reason, 80)}"
lines.append(line)
return "".join(lines)
def _format_id_list(self, call: Content, param_name: str, verb: str) -> str | None:
"""Format a list of IDs with a verb (e.g., Remove #1, Remove #2)."""
ids = as_int_list(get_argument_value(call, param_name))
if not ids:
return None
lines: list[str] = []
for i, todo_id in enumerate(ids):
connector = "├─" if i < len(ids) - 1 else "└─"
lines.append(f"\n {connector} {verb} #{todo_id}")
return "".join(lines)
class ModeToolFormatter(ToolCallFormatter):
"""Formats AgentMode_* tool calls, showing the target mode for Set operations."""
def can_format(self, call: Content) -> bool:
"""Match AgentMode_* tool calls."""
return call.name is not None and call.name.startswith("AgentMode_")
def format_detail(self, call: Content) -> str | None:
"""Format based on the specific AgentMode operation."""
if call.name == "AgentMode_Set":
value = get_argument_value(call, "mode")
return f"({value})" if value else None
return None
class BackgroundAgentToolFormatter(ToolCallFormatter):
"""Formats BackgroundAgents_* tool calls with human-readable details
for task start, continue, wait, and result retrieval operations.
"""
def can_format(self, call: Content) -> bool:
"""Match BackgroundAgents_* tool calls."""
return call.name is not None and call.name.startswith("BackgroundAgents_")
def format_detail(self, call: Content) -> str | None:
"""Format based on the specific BackgroundAgents operation."""
if call.name == "BackgroundAgents_StartTask":
return self._format_start_background_task(call)
if call.name == "BackgroundAgents_WaitForFirstCompletion":
return self._format_id_list(call, "taskIds", "Wait for")
if call.name == "BackgroundAgents_GetTaskResults":
return self._format_single_id(call, "taskId")
if call.name == "BackgroundAgents_ContinueTask":
return self._format_continue_task(call)
if call.name == "BackgroundAgents_ClearCompletedTask":
return self._format_single_id(call, "taskId")
return None
def _format_start_background_task(self, call: Content) -> str | None:
"""Format StartTask with agent name and description."""
agent_name = get_argument_value(call, "agentName")
description = get_argument_value(call, "description")
if agent_name is None and description is None:
return None
lines: list[str] = []
if agent_name is not None and description is not None:
lines.append(f"\n ├─ Agent: {agent_name}")
lines.append(f'\n └─ "{truncate(description, 80)}"')
elif agent_name is not None:
lines.append(f"\n └─ Agent: {agent_name}")
else:
lines.append(f'\n └─ "{truncate(description, 80)}"') # type: ignore[arg-type]
return "".join(lines)
def _format_id_list(self, call: Content, param_name: str, verb: str) -> str | None:
"""Format a list of task IDs with a verb."""
ids = as_int_list(get_argument_value(call, param_name))
if not ids:
return None
lines: list[str] = []
for i, task_id in enumerate(ids):
connector = "├─" if i < len(ids) - 1 else "└─"
lines.append(f"\n {connector} {verb} #{task_id}")
return "".join(lines)
def _format_single_id(self, call: Content, param_name: str) -> str | None:
"""Format a single task ID in parentheses."""
task_id = get_argument_value(call, param_name)
if isinstance(task_id, int):
return f"(task #{task_id})"
return None
def _format_continue_task(self, call: Content) -> str | None:
"""Format ContinueTask with task ID and optional text."""
task_id = get_argument_value(call, "taskId")
text = get_argument_value(call, "text")
if not isinstance(task_id, int):
return None
if text:
lines = [
f"\n ├─ Task #{task_id}",
f'\n └─ "{truncate(text, 80)}"',
]
return "".join(lines)
return f"\n └─ Task #{task_id}"
class FileMemoryToolFormatter(ToolCallFormatter):
"""Formats FileMemory_* tool calls, showing file names and search patterns
with tree-view corners for save operations.
"""
def can_format(self, call: Content) -> bool:
"""Match FileMemory_* tool calls."""
return call.name is not None and call.name.startswith("FileMemory_")
def format_detail(self, call: Content) -> str | None:
"""Format based on the specific FileMemory operation."""
if call.name == "FileMemory_SaveFile":
return self._format_save_file(call)
if call.name in ("FileMemory_ReadFile", "FileMemory_DeleteFile"):
value = get_argument_value(call, "fileName")
return f"({value})" if value else None
if call.name == "FileMemory_SearchFiles":
return self._format_search_files(call)
return None
def _format_save_file(self, call: Content) -> str | None:
"""Format SaveFile with file name and description indicator."""
file_name = get_argument_value(call, "fileName")
description = get_argument_value(call, "description")
if not file_name:
return None
if description:
return f"\n └─ {file_name} (with description)"
return f"\n └─ {file_name}"
def _format_search_files(self, call: Content) -> str | None:
"""Format SearchFiles with regex pattern and optional file pattern."""
pattern = get_argument_value(call, "regexPattern")
file_pattern = get_argument_value(call, "filePattern")
if not pattern:
return None
if file_pattern:
return f"(/{pattern}/ in {file_pattern})"
return f"(/{pattern}/)"
# endregion
# region Public API Functions
def format_tool_call(formatters: list[ToolCallFormatter], call: Content) -> str:
"""Format a tool call using the first matching formatter from the provided list.
Returns "{toolName} {detail}" when a formatter produces detail,
or just "{toolName}" otherwise.
Args:
formatters: List of formatters to try in order.
call: The function call content to format.
Returns:
Formatted string representation of the tool call.
"""
for formatter in formatters:
if formatter.can_format(call):
detail = formatter.format_detail(call)
tool_name = call.name or "Unknown"
return f"{tool_name} {detail}" if detail is not None else tool_name
return call.name or "Unknown"
def build_default_formatters() -> list[ToolCallFormatter]:
"""Create the default list of tool call formatters.
The FallbackToolFormatter is always last. Users can call this function
and combine the result with their own formatters.
Returns:
A list of all built-in tool call formatters.
"""
return [
TodoToolFormatter(),
ModeToolFormatter(),
BackgroundAgentToolFormatter(),
FileMemoryToolFormatter(),
WebSearchToolFormatter(),
FallbackToolFormatter(),
]
# endregion