Python: Harness console for python (#6312)

* Add initial harness console for python

* Add textual to project

* Add planning and approval flows with list selector

* Address PR comments

* Fix list selection bug

* Fix PR #6312 round 2 review comments

- Escape untrusted agent text with rich.markup.escape() in observers
  (text_output, planning_output, reasoning_display) to prevent markup injection
- Remove non-functional 'Always approve' choices from tool_approval.py
  (framework lacks CreateAlwaysApproveToolResponse support)
- Remove textual from root pyproject.toml dev deps (sample-specific)
- Add PEP 723 inline script metadata to harness_research.py
- Narrow except Exception to except NoMatches in list_selection.py

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix build error

* Fix build errors

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
westey
2026-06-09 06:48:35 +01:00
committed by GitHub
Unverified
parent 7e0767a0a0
commit bad05a2bdc
36 changed files with 4735 additions and 74 deletions
@@ -0,0 +1,94 @@
# Harness Console
A Textual-based terminal UI for running and observing AI agents built with the Agent Framework.
## Quick Start
```python
from console import run_agent_async, build_default_observers
await run_agent_async(
agent=my_agent,
session=my_session,
observers=build_default_observers(),
)
```
See [`harness_research.py`](../harness_research.py) for a complete example.
## Package Structure
```
console/
├── __init__.py # Public API exports
├── harness_console.py # run_agent_async() entry point
├── app.py # HarnessApp (Textual application)
├── app_state.py # HarnessAppState, enums, data types
├── agent_runner.py # HarnessAgentRunner (streaming orchestration)
├── state_driver.py # IUXStateDriver protocol
├── textual_state_driver.py # Textual implementation of IUXStateDriver
├── formatters.py # Tool call formatters
├── observers/ # Lifecycle observers
│ ├── base.py # ConsoleObserver abstract base
│ ├── text_output.py # Streaming text display
│ ├── tool_call_display.py # Tool call formatting
│ ├── tool_approval.py # User approval for tool calls
│ ├── error_display.py # Error messages
│ ├── usage_display.py # Token usage tracking
│ └── reasoning_display.py # Reasoning/thinking blocks
├── components/ # Textual UI widgets
│ ├── scroll_panel.py # Conversation history
│ ├── text_input.py # User text input
│ ├── list_selection.py # Multiple choice selector
│ ├── agent_status.py # Spinner + usage display
│ └── agent_mode_help.py # Mode indicator + help text
└── commands/ # Slash command handlers
├── base.py # CommandHandler abstract base
├── exit_handler.py # /exit
├── mode_handler.py # /mode [plan|execute]
├── todo_handler.py # /todos
└── session_handler.py # /session-export, /session-import
```
## Public API
| Export | Description |
|--------|-------------|
| `run_agent_async` | Main entry point — runs the Textual app with an agent |
| `build_default_observers` | Factory for the standard observer set |
| `build_default_command_handlers` | Factory for slash command handlers |
| `ConsoleObserver` | Base class for custom observers |
| `ToolCallFormatter` | Base class for custom tool formatters |
| `CommandHandler` | Base class for custom slash commands |
## Architecture
The console follows a unidirectional data flow:
```
AgentRunner → Observers → StateDriver → AppState → Textual UI
User Input (app.py)
```
- **AgentRunner** streams responses from the agent and dispatches events to observers.
- **Observers** process events (text chunks, tool calls, errors) and update the state driver.
- **StateDriver** (`IUXStateDriver`) mutates `HarnessAppState` and notifies the UI.
- **Textual App** reads state and syncs widgets on each notification.
### Key Design Choices
| Concern | Approach |
|---------|----------|
| Rendering | Textual widgets + Rich markup (no manual ANSI) |
| State | Single `HarnessAppState` dataclass, mutated by driver |
| Streaming text | Truncate-and-rewrite on RichLog for flicker-free updates |
| Extensibility | Custom observers, formatters, and commands via base classes |
| Follow-up questions | Observer returns `FollowUpQuestion` → UI shows prompt/choices |
## Dependencies
- `textual` — TUI framework
- `rich` — Text formatting
- `agent-framework` — Core agent framework
@@ -0,0 +1,27 @@
# Copyright (c) Microsoft. All rights reserved.
"""Harness Console - A Textual-based TUI for AI agent interactions.
This package provides a rich terminal interface for running and observing
AI agents, with streaming output, tool call display, follow-up questions,
and token usage tracking.
"""
from .commands import CommandHandler, build_default_command_handlers
from .formatters import ToolCallFormatter
from .harness_console import run_agent_async
from .observers import (
ConsoleObserver,
build_default_observers,
build_observers_with_planning,
)
__all__ = [
"CommandHandler",
"ConsoleObserver",
"ToolCallFormatter",
"build_default_command_handlers",
"build_default_observers",
"build_observers_with_planning",
"run_agent_async",
]
@@ -0,0 +1,343 @@
# Copyright (c) Microsoft. All rights reserved.
"""Agent runner orchestration for the harness console.
This module provides the HarnessAgentRunner class, which orchestrates agent
invocations with observer lifecycle management. It handles:
- User input dispatch
- Agent streaming with observer notifications
- Follow-up action collection
- Streaming state management
"""
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from agent_framework import Agent, AgentSession
from .app_state import FollowUpAction
from .observers.base import ConsoleObserver
from .state_driver import IUXStateDriver
class HarnessAgentRunner:
"""Orchestrates agent invocations driven by user-input events from the UI.
The component invokes the runner's input handlers (run_turn) directly;
the runner mutates UI state through the supplied IUXStateDriver.
This is a minimal implementation focusing on the core agent loop without
command handling or complex message injection (those can be added later).
"""
def __init__(
self,
agent: Agent,
observers: list[ConsoleObserver],
state_driver: IUXStateDriver,
*,
max_context_window_tokens: int | None = None,
max_output_tokens: int | None = None,
) -> None:
"""Initialize the agent runner.
Args:
agent: The agent to orchestrate.
observers: List of console observers for lifecycle events.
state_driver: The UI state driver for observer updates.
max_context_window_tokens: Optional max context window size for usage display.
max_output_tokens: Optional max output tokens for usage display.
"""
self._agent = agent
self._observers = observers
self._ux = state_driver
self._max_context_window_tokens = max_context_window_tokens
self._max_output_tokens = max_output_tokens
self._input_gate = asyncio.Semaphore(1) # Single turn at a time
async def run_turn(
self,
user_input: str,
session: AgentSession | None = None,
) -> None:
"""Run a single agent turn with the given user input.
Echoes the input, then delegates to the agent loop.
Args:
user_input: The user's input text.
session: Optional agent session for conversation history.
"""
async with self._input_gate:
self._ux.write_user_input_echo(user_input)
from agent_framework import Message
messages = [Message(role="user", contents=[user_input])]
await self._run_agent_loop(messages, session)
async def start_agent_turn(
self,
messages: list,
session: AgentSession | None = None,
) -> None:
"""Resume the agent loop with pre-built messages (from follow-up responses).
Called by the app after the user finishes answering follow-up questions.
If messages is empty, just completes the turn.
Args:
messages: List of Message objects to send to the agent.
session: Optional agent session.
"""
async with self._input_gate:
if not messages:
self._complete_turn()
return
await self._run_agent_loop(messages, session)
async def _run_agent_loop(
self,
messages: list,
session: AgentSession | None,
) -> None:
"""Run the agent loop, re-invoking as needed for follow-up messages.
Loops while there are messages to send. After each stream:
- Collects follow-up actions from observers
- If questions exist → queue them and return (UI will collect answers)
- If only direct messages → loop with those messages
- If nothing → complete the turn
Args:
messages: Initial messages to send.
session: Optional agent session.
"""
next_messages = messages
while next_messages:
# Configure run options
options = self._configure_run_options(session)
# Begin streaming
self._ux.begin_streaming()
self._ux.begin_streaming_output()
self._ux.set_show_spinner(True)
try:
await self._stream_response_messages(next_messages, session, options)
except Exception as ex:
self._ux.append_info_line(
f"❌ Stream error: {ex.__class__.__name__}:\n{ex}",
color="red",
)
# Stop spinner and end streaming output
self._ux.set_show_spinner(False)
# Collect follow-up actions from observers
follow_up_actions = await self._collect_follow_up_actions(session)
# Separate direct messages from questions
has_follow_ups = len(follow_up_actions) > 0
# Write no-text warning if applicable
await self._ux.write_no_text_warning(has_follow_ups)
# Enqueue all follow-up actions
for action in follow_up_actions:
self._ux.enqueue_follow_up_action(action)
# Check if there are pending questions (UI needs user input)
if self._ux.has_pending_questions():
# Pause — the UI will collect answers and call start_agent_turn
return
# No questions — drain any accumulated direct messages and loop
drained = self._ux.take_follow_up_responses()
next_messages = drained if drained else None
self._complete_turn()
def _complete_turn(self) -> None:
"""Complete the current turn (end streaming)."""
self._ux.end_streaming()
def _configure_run_options(
self,
session: AgentSession | None,
) -> dict:
"""Configure run options via observers.
Each observer can modify the options dict to influence agent behavior.
Args:
session: Optional agent session.
Returns:
Options dict for agent.run().
"""
options = {}
for observer in self._observers:
observer.configure_run_options(options, self._agent, session)
return options
async def _stream_response(
self,
user_input: str,
session: AgentSession | None,
options: dict,
) -> None:
"""Stream agent response from a text input and dispatch to observers.
Args:
user_input: The user's input text.
session: Optional agent session.
options: Run options configured by observers.
"""
# Stream response using agent.run(stream=True)
stream = self._agent.run(
user_input,
stream=True,
session=session,
options=options,
)
# Process each update chunk
async for update in stream:
await self._dispatch_update(update, session)
# Extract usage from the final response
self._extract_usage(stream)
async def _stream_response_messages(
self,
messages: list,
session: AgentSession | None,
options: dict,
) -> None:
"""Stream agent response from Message objects and dispatch to observers.
Args:
messages: List of Message objects to send.
session: Optional agent session.
options: Run options configured by observers.
"""
stream = self._agent.run(
messages,
stream=True,
session=session,
options=options,
)
async for update in stream:
await self._dispatch_update(update, session)
self._extract_usage(stream)
def _extract_usage(self, stream) -> None:
"""Extract token usage from a completed stream."""
try:
get_final = getattr(stream, "get_final_response", None)
if not get_final:
return
import inspect
if inspect.iscoroutinefunction(get_final):
return
final_response = get_final()
if final_response is None:
return
usage = getattr(final_response, "usage_details", None)
if not isinstance(usage, dict):
return
input_tokens = usage.get("input_token_count", 0) or 0
output_tokens = usage.get("output_token_count", 0) or 0
if input_tokens or output_tokens:
self._ux.set_usage_text(self._format_usage(input_tokens, output_tokens))
except (AttributeError, TypeError):
pass
async def _dispatch_update(
self,
update, # AgentResponseUpdate
session: AgentSession | None,
) -> None:
"""Dispatch a single update to all observers.
Calls observer lifecycle methods in order:
1. on_response_update (once per update)
2. on_content (for each content item)
3. on_text (if text is present)
Args:
update: The agent response update.
session: Optional agent session.
"""
# on_response_update
for observer in self._observers:
await observer.on_response_update(self._ux, update, self._agent, session)
# on_content for each content item
if hasattr(update, "contents") and update.contents:
for content in update.contents:
for observer in self._observers:
await observer.on_content(self._ux, content, self._agent, session)
# on_text for text chunks
if hasattr(update, "text") and update.text:
for observer in self._observers:
await observer.on_text(self._ux, update.text, self._agent, session)
async def _collect_follow_up_actions(
self,
session: AgentSession | None,
) -> list[FollowUpAction]:
"""Collect follow-up actions from all observers.
Called after streaming completes to gather any follow-up questions
or messages from observers.
Args:
session: Optional agent session.
Returns:
List of follow-up actions from all observers.
"""
actions: list[FollowUpAction] = []
for observer in self._observers:
observer_actions = await observer.on_stream_complete(
self._ux, self._agent, session
)
if observer_actions:
actions.extend(observer_actions)
return actions
def _format_usage(self, input_tokens: int, output_tokens: int) -> str:
"""Format token counts matching C# harness style: 📊 Tokens — input: X | output: Y | total: Z."""
total_tokens = input_tokens + output_tokens
input_budget = None
if self._max_context_window_tokens and self._max_output_tokens:
input_budget = self._max_context_window_tokens - self._max_output_tokens
return (
f"📊 Tokens — input: {self._format_token_count(input_tokens, input_budget)}"
f" | output: {self._format_token_count(output_tokens, self._max_output_tokens)}"
f" | total: {self._format_token_count(total_tokens, self._max_context_window_tokens)}"
)
@staticmethod
def _format_token_count(count: int, budget: int | None) -> str:
"""Format a token count, optionally showing budget percentage."""
if budget and budget > 0:
pct = count / budget * 100
return f"{count:,}/{budget:,} ({pct:.1f}%)"
return f"{count:,}"
@@ -0,0 +1,541 @@
# Copyright (c) Microsoft. All rights reserved.
"""Main Textual application for the harness console.
This module provides the HarnessApp - the main Textual application that
composes all UI components and integrates with the agent runner.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from textual import on, work
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Container, Vertical
from textual.css.query import NoMatches
from textual.widgets import Input, Static
from .app_state import (
BottomPanelMode,
HarnessAppState,
OutputEntryType,
)
from .components import (
AgentModeAndHelp,
AgentStatus,
HarnessListSelection,
HarnessScrollPanel,
HarnessTextInput,
PromptRule,
)
from .textual_state_driver import HarnessConsoleUXStateDriver
if TYPE_CHECKING:
from agent_framework import Agent, AgentSession
from .agent_runner import HarnessAgentRunner
from .commands import CommandHandler
from .observers.base import ConsoleObserver
class HarnessApp(App[None]):
"""Main Textual application for the harness console.
Composes the scroll panel (conversation history), status bar (spinner, usage),
mode/help display, and bottom panel (text input, list selection, or streaming
indicator). Routes user input to the agent runner.
"""
CSS = """
Screen {
background: $background;
}
#scroll-panel {
height: 1fr;
padding: 0 1;
background: transparent;
}
#bottom-panel {
height: auto;
}
#text-input-container {
height: 1;
display: block;
}
#list-selection-container {
height: auto;
max-height: 12;
display: none;
}
#streaming-indicator {
height: 1;
display: none;
}
#status-bar {
height: 1;
}
#mode-help {
height: 1;
}
#top-rule {
height: 1;
}
#bottom-rule {
height: 1;
}
#separator-rule {
height: 1;
}
#text-input {
height: 1;
}
.hidden {
display: none;
}
.visible {
display: block;
}
.input-field {
border: none;
padding: 0;
min-height: 1;
height: 1;
background: transparent;
}
.input-field:focus {
border: none;
background: transparent;
}
.prompt-container {
height: 1;
}
.prompt-label {
width: 2;
min-width: 2;
height: 1;
}
"""
BINDINGS = [
Binding("ctrl+c", "quit", "Quit", show=False),
Binding("ctrl+q", "quit", "Quit", show=False),
]
def __init__(
self,
agent: Agent,
observers: list[ConsoleObserver],
session: AgentSession | None = None,
mode_colors: dict[str, str] | None = None,
initial_mode: str | None = None,
placeholder: str = "Type a message and press Enter...",
title: str = "Harness Console",
max_context_window_tokens: int | None = None,
max_output_tokens: int | None = None,
command_handlers: list[CommandHandler] | None = None,
) -> None:
"""Initialize the harness console application.
Args:
agent: The agent to run.
observers: List of console observers.
session: Optional agent session.
mode_colors: Optional mode color mapping.
initial_mode: Initial agent mode.
placeholder: Input placeholder text.
title: Application title.
max_context_window_tokens: Optional max context window tokens for usage display.
max_output_tokens: Optional max output tokens for usage display.
command_handlers: Optional list of command handlers. If None, auto-detected.
"""
super().__init__()
self.title = title
self._agent = agent
self._observers = observers
self._session = session
self._mode_colors = mode_colors
self._initial_mode = initial_mode
self._placeholder = placeholder
self._max_context_window_tokens = max_context_window_tokens
self._max_output_tokens = max_output_tokens
# Build command handlers
if command_handlers is None:
from .commands import build_default_command_handlers
self._command_handlers = build_default_command_handlers(
agent, mode_colors=mode_colors
)
else:
self._command_handlers = command_handlers
# Compute help text from command handlers
help_parts = [
h.get_help_text()
for h in self._command_handlers
if h.get_help_text() is not None
]
help_text = ", ".join(help_parts) if help_parts else None
# State and driver
self._app_state = HarnessAppState(
placeholder=placeholder,
mode_text=initial_mode,
help_text=help_text,
)
self._ux_driver = HarnessConsoleUXStateDriver(
app_state=self._app_state,
on_state_changed=self._on_state_changed,
mode_colors=mode_colors,
)
# Agent runner (created after init)
self._runner: HarnessAgentRunner | None = None
@property
def ux_driver(self) -> HarnessConsoleUXStateDriver:
"""Get the UX state driver."""
return self._ux_driver
@property
def runner(self) -> HarnessAgentRunner | None:
"""Get the agent runner."""
return self._runner
def compose(self) -> ComposeResult:
"""Compose the application layout."""
with Vertical():
# Main scroll panel for conversation history
yield HarnessScrollPanel(id="scroll-panel")
# Blank line separating scroll content from status area
yield Static(" ", id="separator-rule")
# Status bar (spinner + usage)
yield AgentStatus(id="status-bar")
# Top rule (mode-colored)
yield PromptRule(id="top-rule")
# Bottom panel - switches between text input, list selection, streaming
with Container(id="bottom-panel"):
# Text input (default)
with Container(id="text-input-container"):
text_input = HarnessTextInput(id="text-input")
text_input.placeholder = self._placeholder
yield text_input
# List selection (for follow-up questions)
with Container(id="list-selection-container"):
yield HarnessListSelection(id="list-selection")
# Bottom rule (mode-colored)
yield PromptRule(id="bottom-rule")
# Mode and help
yield AgentModeAndHelp(id="mode-help")
def on_mount(self) -> None:
"""Initialize after mount."""
# Create agent runner now that everything is set up
from .agent_runner import HarnessAgentRunner
self._runner = HarnessAgentRunner(
agent=self._agent,
observers=self._observers,
state_driver=self._ux_driver,
max_context_window_tokens=self._max_context_window_tokens,
max_output_tokens=self._max_output_tokens,
)
# Set initial mode
if self._initial_mode:
self._ux_driver.current_mode = self._initial_mode
# Focus the text input
try:
text_input = self.query_one("#text-input", HarnessTextInput)
text_input.focus_input()
except NoMatches:
pass
# Set initial rule colors and mode display
self._sync_mode_help()
# --- Event handlers ---
@on(HarnessTextInput.Submitted)
def on_text_submitted(self, event: HarnessTextInput.Submitted) -> None:
"""Handle text input submission."""
text = event.value.strip()
if not text:
return
if self._app_state.pending_questions:
# Answer the current follow-up question
self._handle_follow_up_answer(text)
elif self._app_state.mode == BottomPanelMode.STREAMING:
# Input during streaming (message injection placeholder)
pass
elif text.startswith("/"):
# Try command handlers
self._try_command_handlers(text)
else:
# Normal user input - run agent turn
self._run_agent_turn(text)
@work(exclusive=True, thread=False)
async def _try_command_handlers(self, text: str) -> None:
"""Try each command handler; fall through to agent if none match."""
session = self._session
if session is None:
# No session — fall through to agent turn
self._run_agent_turn(text)
return
for handler in self._command_handlers:
if await handler.try_handle(text, session, self._ux_driver):
# Command handled — check for shutdown/session swap signals
self._process_command_signals()
return
# No handler matched — treat as normal agent input
self._run_agent_turn(text)
def _process_command_signals(self) -> None:
"""Check and process signals set by command handlers."""
if self._app_state.shutdown_requested:
self.exit()
return
if self._app_state.replaced_session is not None:
self._session = self._app_state.replaced_session # type: ignore[assignment]
self._app_state.replaced_session = None
self._ux_driver.append_info_line("Session replaced.")
self._sync_ui_from_state()
@on(HarnessListSelection.Selected)
def on_list_selected(self, event: HarnessListSelection.Selected) -> None:
"""Handle list selection."""
self._handle_follow_up_answer(event.value)
# --- Agent turn ---
@work(exclusive=True, thread=False)
async def _run_agent_turn(self, text: str) -> None:
"""Run an agent turn in a background worker."""
if self._runner is None:
return
await self._runner.run_turn(text, session=self._session)
# After turn completes, check for follow-up questions
self._sync_ui_from_state()
# --- Follow-up question handling ---
@work(exclusive=True, thread=False)
async def _handle_follow_up_answer(self, answer: str) -> None:
"""Handle a user's answer to a follow-up question."""
if not self._app_state.pending_questions:
return
question = self._app_state.pending_questions[0]
# Call the continuation
result_message = await question.continuation(answer, self._ux_driver)
# Add result to accumulated responses
if result_message is not None:
self._ux_driver.add_follow_up_response(result_message)
# Advance to next question
self._ux_driver.advance_follow_up_question()
# If no more questions, resume the agent with accumulated responses
if not self._app_state.pending_questions:
responses = self._ux_driver.take_follow_up_responses()
if responses and self._runner:
await self._runner.start_agent_turn(responses, session=self._session)
self._sync_ui_from_state()
# --- State synchronization ---
def _on_state_changed(self) -> None:
"""Called by state driver when state changes - schedule UI sync.
Since the agent runner uses @work(thread=False), state changes happen
on the main event loop. We use call_later to batch updates.
"""
self.call_later(self._sync_ui_from_state)
def _sync_ui_from_state(self) -> None:
"""Synchronize UI components with current application state."""
state = self._app_state
# Update scroll panel with new entries
self._sync_scroll_panel()
# Update bottom panel mode
self._sync_bottom_panel(state.mode)
# Hide status bar and mode/help during list selection (matching C#)
is_list_mode = state.mode == BottomPanelMode.LIST_SELECTION
self._sync_chrome_visibility(not is_list_mode)
# Update status bar
self._sync_status_bar()
# Update mode/help display
self._sync_mode_help()
def _sync_scroll_panel(self) -> None:
"""Sync the scroll panel with output entries."""
try:
panel = self.query_one("#scroll-panel", HarnessScrollPanel)
except NoMatches:
return
entries = self._app_state.output_entries
rendered_count = getattr(self, "_rendered_entry_count", 0)
if rendered_count < len(entries):
# There are new entries to render
for entry in entries[rendered_count:]:
if entry.type == OutputEntryType.STREAMING_TEXT:
panel.set_streaming_entry(entry)
else:
# End any active streaming before appending other entry types
panel.end_streaming()
panel.append_entry(entry)
self._rendered_entry_count = len(entries)
elif rendered_count == len(entries) and entries:
# Same count — check if the last entry is a streaming entry that was mutated
last_entry = entries[-1]
if last_entry.type == OutputEntryType.STREAMING_TEXT:
panel.set_streaming_entry(last_entry)
def _sync_bottom_panel(self, mode: BottomPanelMode) -> None:
"""Switch the bottom panel between text input, list, and streaming."""
try:
text_container = self.query_one("#text-input-container")
list_container = self.query_one("#list-selection-container")
except NoMatches:
return
if mode == BottomPanelMode.TEXT_INPUT:
text_container.display = True
list_container.display = False
# Restore focus to text input
try:
text_input = self.query_one("#text-input", HarnessTextInput)
text_input.focus_input()
except NoMatches:
pass
elif mode == BottomPanelMode.LIST_SELECTION:
text_container.display = False
list_container.display = True
self._sync_list_selection()
elif mode == BottomPanelMode.STREAMING:
text_container.display = True
list_container.display = False
def _sync_list_selection(self) -> None:
"""Sync the list selection widget with state."""
try:
list_widget = self.query_one("#list-selection", HarnessListSelection)
except NoMatches:
return
state = self._app_state
list_widget.title = state.list_selection_title or ""
list_widget.options = list(state.list_selection_options)
list_widget.allow_custom_text = state.list_selection_custom_text_placeholder is not None
if state.list_selection_custom_text_placeholder:
try:
custom_input = list_widget.query_one("#custom-input", Input)
custom_input.placeholder = state.list_selection_custom_text_placeholder
except Exception:
pass
# Focus the option list so keyboard navigation works immediately
list_widget.focus_list()
def _sync_status_bar(self) -> None:
"""Sync the status bar with state."""
try:
status = self.query_one("#status-bar", AgentStatus)
except NoMatches:
return
state = self._app_state
status.show_spinner = state.show_spinner
status.usage_text = state.usage_text or ""
def _sync_mode_help(self) -> None:
"""Sync the mode/help display and rule colors with state."""
try:
mode_help = self.query_one("#mode-help", AgentModeAndHelp)
except NoMatches:
return
state = self._app_state
mode_help.mode = state.mode_text or ""
mode_help.mode_color = state.mode_color or "blue"
mode_help.help_text = state.help_text or ""
# Sync rule colors to match mode
color = state.mode_color or "cyan"
try:
top_rule = self.query_one("#top-rule", PromptRule)
top_rule.rule_color = color
except NoMatches:
pass
try:
bottom_rule = self.query_one("#bottom-rule", PromptRule)
bottom_rule.rule_color = color
except NoMatches:
pass
def _sync_chrome_visibility(self, visible: bool) -> None:
"""Show or hide chrome elements (status bar, mode/help).
During list selection mode, these are hidden to give more vertical
space to the scroll panel and list picker.
Args:
visible: Whether chrome elements should be visible.
"""
import contextlib
with contextlib.suppress(NoMatches):
self.query_one("#status-bar", AgentStatus).display = visible
with contextlib.suppress(NoMatches):
self.query_one("#mode-help", AgentModeAndHelp).display = visible
# --- Rendering count tracking ---
_rendered_entry_count: int = 0
@@ -0,0 +1,260 @@
# Copyright (c) Microsoft. All rights reserved.
"""Application state and core data types for the harness console.
This module defines enums, dataclasses, follow-up action types, and the
HarnessAppState dataclass which holds all UI state that may change during
application execution. The state driver mutates this state to coordinate
between the agent runner and the Textual UI components.
"""
from __future__ import annotations
from collections.abc import Awaitable, Callable
from dataclasses import dataclass, field
from enum import Enum
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from agent_framework import Message
from .state_driver import IUXStateDriver
# region Enums
class OutputEntryType(Enum):
"""Type of output entry in the console conversation."""
USER_INPUT = "user_input"
"""User input echo (e.g., 'You: hello')."""
STREAMING_TEXT = "streaming_text"
"""In-progress streaming text from the agent (accumulated chunk by chunk)."""
INFO_LINE = "info_line"
"""Informational line (tool calls, errors, usage, approval requests, etc.)."""
STREAM_FOOTER = "stream_footer"
"""Stream footer (e.g., '(no text response from agent)')."""
PENDING_MESSAGE = "pending_message"
"""Pending injected message notification."""
class BottomPanelMode(Enum):
"""Mode of the bottom panel UI."""
TEXT_INPUT = "text_input"
"""Show text input for user messages."""
LIST_SELECTION = "list_selection"
"""Show choice list for user selection."""
STREAMING = "streaming"
"""Show 'streaming...' indicator while agent is generating."""
# endregion
# region Output Entry
@dataclass
class OutputEntry:
"""A single output entry in the console conversation history.
Used internally by the state driver to track conversation output,
including streaming text, tool calls, errors, and user input echoes.
Args:
type: The type of output entry.
text: The text content of the entry.
color: Optional Rich color string (e.g., "cyan", "red", "dim").
"""
type: OutputEntryType
text: str
color: str | None = None
# endregion
# region Follow-Up Actions
class FollowUpAction:
"""Base class for follow-up actions returned by observers.
Follow-up actions describe either a question to ask the user
(via FollowUpQuestion subclasses) or a message to add directly
to the next agent input (FollowUpMessage).
"""
pass
@dataclass
class FollowUpQuestion(FollowUpAction):
"""A question to ask the user with a continuation.
The continuation delegate is invoked with the user's answer and the
UX state driver, and returns an optional Message to add to the next
agent invocation.
Args:
prompt: The question text shown to the user.
continuation: Async function invoked with the user's answer and state driver.
Returns an optional Message to add to the next agent input.
"""
prompt: str
continuation: Callable[[str, IUXStateDriver], Awaitable[Message | None]]
@dataclass
class TextFollowUpQuestion(FollowUpQuestion):
"""A free-form text question.
The user may type any response. This is the base FollowUpQuestion type
with no additional constraints.
"""
pass
@dataclass
class ChoiceFollowUpQuestion(FollowUpQuestion):
"""A multiple choice question.
The user picks from the provided choices, with an optional ability to
enter custom text when allow_custom_text is True.
Args:
prompt: The question text shown to the user.
choices: List of pre-defined choices.
allow_custom_text: If True, the user may type a custom response in
addition to the listed choices.
continuation: Async function invoked with the user's choice/text and
state driver. Returns an optional Message to add to the next agent input.
"""
choices: list[str]
allow_custom_text: bool = False
@dataclass
class FollowUpMessage(FollowUpAction):
"""A message to add directly to the next agent invocation without prompting.
Used when an observer wants to inject a message into the conversation
without user interaction (e.g., automatic tool results, system messages).
Args:
message: The Message to add to the conversation.
"""
message: Message
# endregion
# region Application State
@dataclass
class HarnessAppState:
"""All UI state for the harness console application.
This state is mutated by the UX state driver and read by the Textual
app to update the UI.
"""
# --- Bottom panel mode ---
mode: BottomPanelMode = BottomPanelMode.TEXT_INPUT
"""Which component is shown in the bottom panel."""
# --- Follow-up question queue ---
pending_questions: list[FollowUpQuestion] = field(default_factory=list)
"""Queue of follow-up questions waiting for user answers.
The head ([0]) is the question currently being displayed; subsequent items
are dispatched in order as each is answered.
"""
accumulated_follow_up_responses: list[Message] = field(default_factory=list)
"""Accumulated follow-up response messages collected during the current agent turn.
Both direct FollowUpMessages emitted by observers and continuation results
from answered questions. Consumed by the runner via take_follow_up_responses().
"""
# --- Text input (active in TextInput / Streaming modes) ---
prompt: str = "> "
"""The prompt string for text input mode."""
placeholder: str = ""
"""Placeholder text shown when the input is empty."""
input_text: str = ""
"""The current input text being typed."""
input_enabled: bool = True
"""Whether input is enabled (disabled during streaming without injection)."""
streaming_prompt: str = "(agent is running...)"
"""The prompt to show during streaming when input is disabled."""
# --- List selection (active in ListSelection mode) ---
list_selection_title: str | None = None
"""Title text displayed above the list selection."""
list_selection_options: list[str] = field(default_factory=list)
"""The list selection options."""
list_selection_index: int = 0
"""The highlighted option index in list selection mode."""
list_selection_custom_text_placeholder: str | None = None
"""Placeholder text for the custom text input option in the list."""
list_selection_custom_input_text: str = ""
"""Current text being typed into the list's custom text option."""
# --- Scroll / output area ---
output_entries: list[OutputEntry] = field(default_factory=list)
"""Output entries in the scroll area conversation history."""
queued_items: list[str] = field(default_factory=list)
"""Queued input items to display (pending injected messages)."""
# --- Agent mode + status display ---
mode_color: str | None = None
"""Rich color string for the rule borders and mode label."""
mode_text: str | None = None
"""Current mode name displayed (e.g., 'plan', 'execute')."""
help_text: str | None = None
"""Help text displayed below the bottom rule (available commands)."""
show_spinner: bool = False
"""Whether the agent status spinner is visible."""
usage_text: str | None = None
"""Formatted token usage text to display in the status bar."""
# --- Command handler signals ---
shutdown_requested: bool = False
"""Set to True when /exit is invoked; the app should exit."""
replaced_session: object | None = None
"""When set, the app should swap its session to this AgentSession."""
@@ -0,0 +1,65 @@
# Copyright (c) Microsoft. All rights reserved.
"""Command handler package for the harness console.
Provides slash-command handling (e.g., /exit, /mode, /todos, /session-export)
that intercepts user input before it reaches the agent.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from .base import CommandHandler
from .exit_handler import ExitCommandHandler
from .mode_handler import ModeCommandHandler
from .session_handler import SessionCommandHandler
from .todo_handler import TodoCommandHandler
if TYPE_CHECKING:
from agent_framework import Agent
__all__ = [
"CommandHandler",
"ExitCommandHandler",
"ModeCommandHandler",
"SessionCommandHandler",
"TodoCommandHandler",
"build_default_command_handlers",
]
def build_default_command_handlers(
agent: Agent,
*,
mode_colors: dict[str, str] | None = None,
) -> list[CommandHandler]:
"""Build the default set of command handlers by inspecting the agent.
Auto-detects TodoProvider and AgentModeProvider from the agent's
context_providers list.
Args:
agent: The agent to inspect for providers.
mode_colors: Optional mapping of mode names to Rich color strings.
Returns:
List of command handlers in evaluation order.
"""
from agent_framework import AgentModeProvider, TodoProvider
todo_provider: TodoProvider | None = None
mode_provider: AgentModeProvider | None = None
for provider in getattr(agent, "context_providers", []):
if isinstance(provider, TodoProvider) and todo_provider is None:
todo_provider = provider
elif isinstance(provider, AgentModeProvider) and mode_provider is None:
mode_provider = provider
return [
ExitCommandHandler(),
TodoCommandHandler(todo_provider),
ModeCommandHandler(mode_provider, mode_colors),
SessionCommandHandler(),
]
@@ -0,0 +1,58 @@
# Copyright (c) Microsoft. All rights reserved.
"""Abstract base class for console command handlers.
Command handlers intercept user input starting with '/' and execute
local commands before input reaches the agent. They are checked in order;
the first handler that accepts the input prevents further handlers from
being checked.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from agent_framework import AgentSession
from ..state_driver import IUXStateDriver
class CommandHandler(ABC):
"""Base class for console command handlers.
Subclasses implement get_help_text() for the mode bar and
try_handle() to intercept matching commands.
"""
@abstractmethod
def get_help_text(self) -> str | None:
"""Get the help text for this command.
Displayed in the mode-and-help bar. Return None if the
command is not currently available.
Returns:
Help text like '/todos (show todo list)', or None.
"""
...
@abstractmethod
async def try_handle(
self,
user_input: str,
session: AgentSession,
ux: IUXStateDriver,
) -> bool:
"""Attempt to handle the given user input.
Args:
user_input: The raw user input string.
session: The current agent session.
ux: The UX state driver for rendering output.
Returns:
True if this handler handled the input; False otherwise.
"""
...
@@ -0,0 +1,35 @@
# Copyright (c) Microsoft. All rights reserved.
"""Exit command handler — /exit to quit the console."""
from __future__ import annotations
from typing import TYPE_CHECKING
from .base import CommandHandler
if TYPE_CHECKING:
from agent_framework import AgentSession
from ..state_driver import IUXStateDriver
class ExitCommandHandler(CommandHandler):
"""Handle the /exit command to shut down the console application."""
def get_help_text(self) -> str | None:
"""Return help text for the exit command."""
return "/exit (quit)"
async def try_handle(
self,
user_input: str,
session: AgentSession,
ux: IUXStateDriver,
) -> bool:
"""Handle /exit by requesting shutdown."""
if user_input.strip().lower() != "/exit":
return False
ux.request_shutdown()
return True
@@ -0,0 +1,81 @@
# Copyright (c) Microsoft. All rights reserved.
"""Mode command handler — /mode to show or switch agent mode."""
from __future__ import annotations
from typing import TYPE_CHECKING
from .base import CommandHandler
if TYPE_CHECKING:
from agent_framework import AgentModeProvider, AgentSession
from ..state_driver import IUXStateDriver
class ModeCommandHandler(CommandHandler):
"""Handle the /mode command to display or switch the current agent mode."""
def __init__(
self,
mode_provider: AgentModeProvider | None,
mode_colors: dict[str, str] | None = None,
) -> None:
"""Initialize with mode provider and color mapping.
Args:
mode_provider: The mode provider, or None if not available.
mode_colors: Optional mapping of mode names to Rich color strings.
"""
self._mode_provider = mode_provider
self._mode_colors = mode_colors or {}
def get_help_text(self) -> str | None:
"""Return help text, or None if mode provider is unavailable."""
if self._mode_provider is None:
return None
return "/mode [plan|execute] (show or switch mode)"
async def try_handle(
self,
user_input: str,
session: AgentSession,
ux: IUXStateDriver,
) -> bool:
"""Handle /mode [name] command."""
stripped = user_input.strip()
lower = stripped.lower()
if not (lower == "/mode" or lower.startswith("/mode ")):
return False
if self._mode_provider is None:
ux.append_info_line("AgentModeProvider is not available.")
return True
parts = stripped.split(None, 1)
if len(parts) < 2:
# Show current mode
from agent_framework import get_agent_mode
current = get_agent_mode(session)
ux.append_info_line(f"Current mode: {current}")
return True
# Switch mode
new_mode = parts[1].strip()
try:
from agent_framework import set_agent_mode
normalized = set_agent_mode(session, new_mode)
color = self._mode_colors.get(normalized)
ux.set_mode(normalized, color)
ux.append_info_line(
f"Switched to {normalized} mode.",
color=color,
)
except ValueError as ex:
ux.append_info_line(str(ex), color="red")
return True
@@ -0,0 +1,107 @@
# Copyright (c) Microsoft. All rights reserved.
"""Session command handler — /session-export and /session-import."""
from __future__ import annotations
import json
from typing import TYPE_CHECKING
from .base import CommandHandler
if TYPE_CHECKING:
from agent_framework import AgentSession
from ..state_driver import IUXStateDriver
class SessionCommandHandler(CommandHandler):
"""Handle /session-export and /session-import commands."""
def get_help_text(self) -> str | None:
"""Return help text for session commands."""
return "/session-export <file> | /session-import <file>"
async def try_handle(
self,
user_input: str,
session: AgentSession,
ux: IUXStateDriver,
) -> bool:
"""Handle session export/import commands."""
stripped = user_input.strip()
command = stripped.split(None, 1)[0].lower() if stripped else ""
if command == "/session-export":
await self._handle_export(stripped, session, ux)
return True
if command == "/session-import":
await self._handle_import(stripped, ux)
return True
return False
async def _handle_export(
self,
user_input: str,
session: AgentSession,
ux: IUXStateDriver,
) -> None:
"""Export the current session to a JSON file."""
parts = user_input.split(None, 1)
if len(parts) < 2:
ux.append_info_line("Usage: /session-export <filename>")
return
filename = parts[1].strip()
try:
serialized = session.to_dict()
json_str = json.dumps(serialized, indent=2)
self._write_file(filename, json_str)
ux.append_info_line(f"Session exported to {filename}")
except Exception as ex:
ux.append_info_line(
f"Failed to export session to {filename}: {ex}",
color="red",
)
async def _handle_import(
self,
user_input: str,
ux: IUXStateDriver,
) -> None:
"""Import a session from a JSON file."""
parts = user_input.split(None, 1)
if len(parts) < 2:
ux.append_info_line("Usage: /session-import <filename>")
return
filename = parts[1].strip()
try:
from agent_framework import AgentSession
json_str = self._read_file(filename)
data = json.loads(json_str)
new_session = AgentSession.from_dict(data)
ux.replace_session(new_session)
ux.append_info_line(f"Session imported from {filename}")
except FileNotFoundError:
ux.append_info_line(f"File not found: {filename}", color="red")
except Exception as ex:
ux.append_info_line(
f"Failed to import session from {filename}: {ex}",
color="red",
)
@staticmethod
def _write_file(filename: str, content: str) -> None:
"""Write content to a file (sync helper to satisfy ASYNC230)."""
with open(filename, "w", encoding="utf-8") as f: # noqa: ASYNC230
f.write(content)
@staticmethod
def _read_file(filename: str) -> str:
"""Read content from a file (sync helper to satisfy ASYNC230)."""
with open(filename, encoding="utf-8") as f: # noqa: ASYNC230
return f.read()
@@ -0,0 +1,66 @@
# Copyright (c) Microsoft. All rights reserved.
"""Todo command handler — /todos to display the todo list."""
from __future__ import annotations
from typing import TYPE_CHECKING
from .base import CommandHandler
if TYPE_CHECKING:
from agent_framework import AgentSession, TodoProvider
from ..state_driver import IUXStateDriver
class TodoCommandHandler(CommandHandler):
"""Handle the /todos command to display the current todo list."""
def __init__(self, todo_provider: TodoProvider | None) -> None:
"""Initialize with the todo provider.
Args:
todo_provider: The todo provider, or None if not available.
"""
self._todo_provider = todo_provider
def get_help_text(self) -> str | None:
"""Return help text, or None if todo provider is unavailable."""
if self._todo_provider is None:
return None
return "/todos (show todo list)"
async def try_handle(
self,
user_input: str,
session: AgentSession,
ux: IUXStateDriver,
) -> bool:
"""Handle /todos by displaying the todo list."""
if user_input.strip().lower() != "/todos":
return False
if self._todo_provider is None:
ux.append_info_line("TodoProvider is not available.")
return True
todos = await self._todo_provider.store.load_items(
session, source_id=self._todo_provider.source_id
)
if not todos:
ux.append_info_line("No todos yet.")
return True
ux.append_info_line("── Todo List ──")
for item in todos:
status = "" if item.is_complete else ""
color = "dim" if item.is_complete else None
description = f"{item.description}" if item.description else ""
ux.append_info_line(
f"[{status}] #{item.id} {item.title}{description}",
color=color,
)
return True
@@ -0,0 +1,23 @@
# Copyright (c) Microsoft. All rights reserved.
"""UI components for the harness console.
This module provides Textual widgets for building the harness console UI,
including status displays, input fields, choice selectors, and scrolling panels.
"""
from .agent_status import AgentStatus
from .list_selection import HarnessListSelection
from .mode_help import AgentModeAndHelp
from .prompt_rule import PromptRule
from .scroll_panel import HarnessScrollPanel
from .text_input import HarnessTextInput
__all__ = [
"AgentStatus",
"AgentModeAndHelp",
"HarnessListSelection",
"PromptRule",
"HarnessScrollPanel",
"HarnessTextInput",
]
@@ -0,0 +1,66 @@
# Copyright (c) Microsoft. All rights reserved.
"""Agent status widget with spinner animation and usage statistics."""
from __future__ import annotations
from textual.reactive import reactive
from textual.widgets import Static
class AgentStatus(Static):
"""Agent status bar with animated spinner and token usage display.
Displays an animated braille pattern spinner when the agent is active,
along with token usage statistics. The component automatically updates
the spinner animation at ~10fps for smooth visual feedback.
Attributes:
show_spinner: Whether to display the animated spinner.
usage_text: Token usage text to display (e.g., "1.2K in / 856 out").
"""
# Braille pattern spinner frames for smooth animation
SPINNER_FRAMES = ["", "", "", "", "", "", "", "", "", ""]
show_spinner: reactive[bool] = reactive(False)
usage_text: reactive[str] = reactive("")
def __init__(self, **kwargs) -> None:
"""Initialize the agent status widget."""
super().__init__(**kwargs)
self._spinner_index = 0
def on_mount(self) -> None:
"""Start the spinner animation timer when the widget is mounted."""
# Update spinner at ~10fps (every 0.1 seconds)
self.set_interval(0.1, self._advance_spinner)
def _advance_spinner(self) -> None:
"""Advance the spinner to the next frame."""
if self.show_spinner:
self._spinner_index = (self._spinner_index + 1) % len(self.SPINNER_FRAMES)
self.refresh()
def render(self) -> str:
"""Render the status bar with spinner and usage text.
Returns:
Formatted string with Rich markup for spinner and usage display.
"""
if not self.show_spinner and not self.usage_text:
return ""
parts = []
if self.show_spinner:
frame = self.SPINNER_FRAMES[self._spinner_index]
parts.append(f"[cyan]{frame}[/cyan]")
else:
# Keep consistent spacing when spinner is off
parts.append(" ")
if self.usage_text:
parts.append(f"[dim]{self.usage_text}[/dim]")
return " ".join(parts)
@@ -0,0 +1,269 @@
# Copyright (c) Microsoft. All rights reserved.
"""List selection widget with optional custom text input."""
from __future__ import annotations
from textual import on
from textual.app import ComposeResult
from textual.binding import Binding
from textual.containers import Container
from textual.css.query import NoMatches
from textual.events import Key
from textual.message import Message
from textual.reactive import reactive
from textual.widget import Widget
from textual.widgets import Input, Label, OptionList
from textual.widgets.option_list import Option
class HarnessListSelection(Widget):
"""List selection widget with numbered choices and optional custom text input.
Displays a title, a list of numbered choices that can be selected via
keyboard navigation or number keys (1-9), and an optional custom text
input field at the bottom.
All child nodes (title label, option list, custom input) are always
present in the DOM; visibility is toggled via reactive watchers.
Navigation:
- Down arrow on last list item moves focus to the custom text input
- Up arrow on the custom text input moves focus back to the option list
- When custom input has focus, the option list highlight is cleared
Attributes:
title: The title text displayed above the options.
options: List of option strings to display.
allow_custom_text: Whether to show a custom text input field.
"""
DEFAULT_CSS = """
HarnessListSelection {
height: auto;
max-height: 12;
}
HarnessListSelection .list-selection-container {
height: auto;
}
HarnessListSelection #selection-title {
height: auto;
color: $text;
text-style: bold;
padding: 0 0 0 0;
}
HarnessListSelection #option-list {
height: auto;
max-height: 8;
border: none;
padding: 0;
}
HarnessListSelection #custom-input {
height: auto;
min-height: 1;
margin-top: 0;
border: tall transparent;
}
HarnessListSelection #custom-input:focus {
border: tall $accent;
}
"""
BINDINGS = [
Binding("1", "select_option(0)", "Select option 1", show=False),
Binding("2", "select_option(1)", "Select option 2", show=False),
Binding("3", "select_option(2)", "Select option 3", show=False),
Binding("4", "select_option(3)", "Select option 4", show=False),
Binding("5", "select_option(4)", "Select option 5", show=False),
Binding("6", "select_option(5)", "Select option 6", show=False),
Binding("7", "select_option(6)", "Select option 7", show=False),
Binding("8", "select_option(7)", "Select option 8", show=False),
Binding("9", "select_option(8)", "Select option 9", show=False),
]
title: reactive[str] = reactive("")
options: reactive[list[str]] = reactive(list, always_update=True)
allow_custom_text: reactive[bool] = reactive(False)
class Selected(Message):
"""Message sent when an option is selected.
Attributes:
value: The selected option text or custom text.
"""
def __init__(self, value: str) -> None:
"""Initialize the Selected message.
Args:
value: The selected option text or custom text.
"""
self.value = value
super().__init__()
def compose(self) -> ComposeResult:
"""Compose the widget — all nodes are always present.
Yields:
Title label (hidden if empty), option list, custom input (hidden by default).
"""
with Container(classes="list-selection-container"):
yield Label("", id="selection-title")
yield OptionList(id="option-list")
yield Input(
placeholder="Or type a custom response...",
id="custom-input",
)
def on_mount(self) -> None:
"""Configure initial visibility after mount."""
title_label = self.query_one("#selection-title", Label)
title_label.display = bool(self.title)
custom_input = self.query_one("#custom-input", Input)
custom_input.display = self.allow_custom_text
self._update_options()
def on_key(self, event: Key) -> None:
"""Handle key navigation between option list and custom input.
Args:
event: The key event.
"""
if not self.allow_custom_text:
return
option_list = self.query_one("#option-list", OptionList)
custom_input = self.query_one("#custom-input", Input)
# Down arrow on last item → move to custom input
if event.key == "down" and option_list.has_focus:
last_index = option_list.option_count - 1
if last_index >= 0 and option_list.highlighted == last_index:
option_list.highlighted = None # type: ignore[assignment]
custom_input.focus()
event.prevent_default()
event.stop()
# Up arrow on custom input → move back to option list (last item)
elif event.key == "up" and custom_input.has_focus:
last_index = option_list.option_count - 1
if last_index >= 0:
option_list.highlighted = last_index
option_list.focus()
event.prevent_default()
event.stop()
@on(Input.Changed, "#custom-input")
def on_custom_input_focused_or_changed(self, event: Input.Changed) -> None:
"""Clear option list highlight when user is typing in custom input.
Args:
event: The input changed event.
"""
option_list = self.query_one("#option-list", OptionList)
option_list.highlighted = None # type: ignore[assignment]
def watch_title(self, new_title: str) -> None:
"""Update the title label when the title changes.
Args:
new_title: The new title text.
"""
try:
label = self.query_one("#selection-title", Label)
label.update(new_title)
label.display = bool(new_title)
except NoMatches:
pass
def watch_options(self, new_options: list[str]) -> None:
"""Update the option list when options change.
Args:
new_options: The new list of options.
"""
import contextlib
with contextlib.suppress(NoMatches):
self._update_options()
def watch_allow_custom_text(self, allow: bool) -> None:
"""Show/hide the custom input field.
Args:
allow: Whether to show the custom text input.
"""
try:
custom_input = self.query_one("#custom-input", Input)
custom_input.display = allow
except NoMatches:
pass
def _update_options(self) -> None:
"""Update the OptionList with numbered options."""
try:
option_list = self.query_one("#option-list", OptionList)
option_list.clear_options()
for i, option_text in enumerate(self.options):
display_text = f"{i + 1}. {option_text}" if i < 9 else f" {option_text}"
option_list.add_option(Option(display_text, id=str(i)))
except NoMatches:
pass
@on(OptionList.OptionSelected)
def on_option_selected(self, event: OptionList.OptionSelected) -> None:
"""Handle option selection from the list.
Args:
event: The OptionList.OptionSelected event.
"""
option_index = int(event.option.id or "0")
if 0 <= option_index < len(self.options):
selected_value = self.options[option_index]
self.post_message(self.Selected(selected_value))
@on(Input.Submitted)
def on_input_submitted(self, event: Input.Submitted) -> None:
"""Handle custom text input submission.
Args:
event: The Input.Submitted event.
"""
if self.allow_custom_text and event.value:
self.post_message(self.Selected(event.value))
event.input.clear()
def action_select_option(self, index: int) -> None:
"""Select an option by index (0-based).
Args:
index: The option index to select.
"""
if 0 <= index < len(self.options):
selected_value = self.options[index]
self.post_message(self.Selected(selected_value))
def focus_list(self) -> None:
"""Focus the option list."""
try:
option_list = self.query_one("#option-list", OptionList)
option_list.focus()
except NoMatches:
pass
def focus_custom_input(self) -> None:
"""Focus the custom text input field."""
if self.allow_custom_text:
try:
custom_input = self.query_one("#custom-input", Input)
custom_input.focus()
except NoMatches:
pass
@@ -0,0 +1,48 @@
# Copyright (c) Microsoft. All rights reserved.
"""Agent mode and help text display widget."""
from __future__ import annotations
from rich.text import Text
from textual.reactive import reactive
from textual.widgets import Static
class AgentModeAndHelp(Static):
"""Widget displaying the current agent mode and help text.
Shows the current agent mode (e.g., "plan", "execute") in a colored label,
followed by available commands and help text in a dimmed style. Used in
the fixed bottom area of the console.
Attributes:
mode: Current mode name (e.g., "plan", "execute"), or None if no mode.
mode_color: Rich color string for the mode label (e.g., "yellow", "green").
help_text: Help text to display (e.g., "/exit to quit, /mode to switch").
"""
mode: reactive[str | None] = reactive(None)
mode_color: reactive[str] = reactive("yellow")
help_text: reactive[str] = reactive("")
def render(self) -> Text:
"""Render the mode indicator and help text.
Returns:
Rich Text object with styled mode and help display.
"""
result = Text()
if self.mode:
result.append(f"[{self.mode}]", style=self.mode_color)
if self.help_text:
if self.mode:
result.append(" ")
result.append(self.help_text, style="dim")
if not result.plain:
result.append(" ")
return result
@@ -0,0 +1,31 @@
# Copyright (c) Microsoft. All rights reserved.
"""Mode-colored horizontal rule."""
from __future__ import annotations
from textual.reactive import reactive
from textual.widgets import Static
class PromptRule(Static):
"""A full-width horizontal rule colored by the current agent mode.
Renders a line of '' characters across the terminal width,
colored to match the current mode (e.g., cyan for plan, green for execute).
Attributes:
rule_color: Rich color string for the rule (e.g., "cyan", "green").
"""
rule_color: reactive[str] = reactive("cyan")
def render(self) -> str:
"""Render the horizontal rule.
Returns:
Formatted string with Rich markup.
"""
color = self.rule_color
width = self.size.width or 80
return f"[{color}]{'' * width}[/{color}]"
@@ -0,0 +1,127 @@
# Copyright (c) Microsoft. All rights reserved.
"""Scrolling panel for conversation history display."""
from __future__ import annotations
from typing import TYPE_CHECKING
from textual.widgets import RichLog
if TYPE_CHECKING:
from ..app_state import OutputEntry
class HarnessScrollPanel(RichLog):
"""Scrolling panel for displaying conversation history.
Uses Textual's RichLog widget for efficient append-only rendering with
Rich text formatting support. Automatically scrolls to the bottom when
new entries are added.
For streaming text, the panel uses a truncate-and-rewrite strategy: it
tracks where streaming began in the RichLog lines list, and on each update
truncates back to that point and rewrites the full accumulated text as a
single write. This ensures consistent rendering without line-break artifacts
between streamed chunks.
"""
def __init__(self, **kwargs) -> None:
"""Initialize the scroll panel.
Args:
**kwargs: Additional arguments passed to RichLog.
"""
super().__init__(
**kwargs,
auto_scroll=True, # Automatically scroll to bottom
wrap=True, # Wrap long lines instead of horizontal scroll
markup=True, # Enable Rich markup
highlight=True, # Enable syntax highlighting
)
self._entries: list[OutputEntry] = []
self._is_streaming = False
self._streaming_line_start: int = 0
def append_entry(self, entry: OutputEntry) -> None:
"""Append a new output entry to the conversation history.
Args:
entry: The output entry to append.
"""
self._entries.append(entry)
text = self._format_entry(entry)
self.write(text)
def set_streaming_entry(self, entry: OutputEntry) -> None:
"""Set or update the current streaming entry.
On each update, truncates the RichLog back to where streaming
started, then rewrites the full streaming text as a single block.
This ensures no spurious line breaks between chunks while avoiding
a full rewrite of all entries.
Args:
entry: The streaming entry (will be mutated externally).
"""
if not self._is_streaming:
# First streaming chunk — record where streaming lines begin
self._is_streaming = True
self._entries.append(entry)
self._streaming_line_start = len(self.lines)
# Truncate lines back to where streaming started
if len(self.lines) > self._streaming_line_start:
del self.lines[self._streaming_line_start:]
from textual.geometry import Size
self.virtual_size = Size(self._widest_line_width, len(self.lines))
# Write full streaming text as a single renderable
formatted = self._format_text(entry.text, entry.color)
self.write(formatted)
def end_streaming(self) -> None:
"""End the current streaming mode."""
if self._is_streaming:
self._is_streaming = False
self._streaming_line_start = 0
def _rewrite_all(self) -> None:
"""Clear and rewrite all entries from scratch."""
self.clear()
for entry in self._entries:
self.write(self._format_entry(entry))
def _format_entry(self, entry: OutputEntry) -> str:
"""Format an output entry with Rich markup.
Args:
entry: The entry to format.
Returns:
Formatted string with Rich markup for color and styling.
"""
return self._format_text(entry.text, entry.color)
@staticmethod
def _format_text(text: str, color: str | None) -> str:
"""Format text with optional Rich color markup.
Args:
text: The text to format.
color: Optional Rich color name.
Returns:
Formatted string.
"""
if color:
return f"[{color}]{text}[/{color}]"
return text
def clear_history(self) -> None:
"""Clear all conversation history from the panel."""
self._entries.clear()
self._is_streaming = False
self._streaming_line_start = 0
self.clear()
@@ -0,0 +1,102 @@
# Copyright (c) Microsoft. All rights reserved.
"""Text input widget with inline prompt for the harness console."""
from __future__ import annotations
from textual import on
from textual.app import ComposeResult
from textual.containers import Horizontal
from textual.message import Message
from textual.reactive import reactive
from textual.widget import Widget
from textual.widgets import Input, Label
class HarnessTextInput(Widget):
"""Text input widget with a prompt label on the left.
Displays a prompt (e.g., "> ") followed by a borderless input field.
Sits between the two mode-colored horizontal rules.
Attributes:
prompt: The prompt text displayed on the left (e.g., "> ").
placeholder: Placeholder text shown when the input is empty.
"""
prompt: reactive[str] = reactive("> ")
placeholder: reactive[str] = reactive("")
class Submitted(Message):
"""Message sent when the input is submitted.
Attributes:
value: The submitted text value.
"""
def __init__(self, value: str) -> None:
"""Initialize the Submitted message.
Args:
value: The submitted text value.
"""
self.value = value
super().__init__()
def compose(self) -> ComposeResult:
"""Compose the prompt label and input field.
Yields:
A horizontal container with the prompt and input field.
"""
with Horizontal(classes="prompt-container"):
yield Label(self.prompt, classes="prompt-label", id="prompt-label")
yield Input(placeholder=self.placeholder, classes="input-field", id="input-field")
def watch_prompt(self, new_prompt: str) -> None:
"""Update the prompt label when the prompt attribute changes.
Args:
new_prompt: The new prompt text.
"""
try:
label = self.query_one("#prompt-label", Label)
label.update(new_prompt)
except Exception:
pass
def watch_placeholder(self, new_placeholder: str) -> None:
"""Update the input placeholder when the placeholder attribute changes.
Args:
new_placeholder: The new placeholder text.
"""
try:
input_field = self.query_one("#input-field", Input)
input_field.placeholder = new_placeholder
except Exception:
# Input doesn't exist yet (before compose), ignore
pass
@on(Input.Submitted)
def on_input_submitted(self, event: Input.Submitted) -> None:
"""Handle input submission.
Clears the input field and posts a Submitted message with the value.
Args:
event: The Input.Submitted event.
"""
value = event.value
event.input.clear()
self.post_message(self.Submitted(value))
def focus_input(self) -> None:
"""Focus the input field."""
input_field = self.query_one(".input-field", Input)
input_field.focus()
def clear_input(self) -> None:
"""Clear the input field."""
input_field = self.query_one(".input-field", Input)
input_field.clear()
@@ -0,0 +1,503 @@
# Copyright (c) Microsoft. All rights reserved.
"""Tool call formatters for displaying function calls in the harness console.
This module provides formatters that convert raw function call content into
human-readable display strings. Each formatter handles specific tool patterns
(e.g., web_search, todos_*, etc.) and the FallbackToolFormatter provides
generic formatting for any unmatched tools.
Usage:
from harness.console.formatters import build_default_formatters, format_tool_call
from agent_framework import Content
call = Content.from_function_call(
call_id="call_1",
name="web_search",
arguments={"query": "Python async"}
)
formatters = build_default_formatters()
result = format_tool_call(formatters, call) # "web_search (Python async)"
"""
from __future__ import annotations
import contextlib
import json
from abc import ABC, abstractmethod
from typing import Any
from agent_framework import Content
# region Helper Functions
def get_argument_value(call: Content, param_name: str) -> Any:
"""Extract an argument value from a function call.
Handles both dict and JSON string arguments.
Args:
call: The function call content.
param_name: The parameter name to extract.
Returns:
The argument value, or None if not found.
"""
if call.arguments is None:
return None
if isinstance(call.arguments, str):
# arguments is a JSON string, parse it
try:
args_dict = json.loads(call.arguments)
except (json.JSONDecodeError, TypeError):
return None
if not isinstance(args_dict, dict):
return None
elif isinstance(call.arguments, dict):
args_dict = call.arguments
else:
return None
return args_dict.get(param_name)
def as_int_list(value: Any) -> list[int] | None:
"""Convert a value to a list of integers, or None if not possible.
Args:
value: The value to convert (should be a list).
Returns:
A list of integers, or None if conversion fails.
"""
if not isinstance(value, list):
return None
result: list[int] = []
for item in value:
if isinstance(item, int):
result.append(item)
else:
with contextlib.suppress(ValueError, TypeError):
result.append(int(item))
return result if result else None
def as_dict_list(value: Any) -> list[dict[str, Any]] | None:
"""Convert a value to a list of dicts, or None if not possible.
Args:
value: The value to convert (should be a list).
Returns:
A list of dicts, or None if value is not a list of dicts.
"""
if not isinstance(value, list):
return None
result: list[dict[str, Any]] = []
for item in value:
if isinstance(item, dict):
result.append(item)
return result if result else None
def truncate(text: str, max_length: int) -> str:
"""Truncate a string to the specified maximum length, appending an ellipsis if truncated.
Args:
text: The text to truncate.
max_length: The maximum length.
Returns:
The truncated string.
"""
return text if len(text) <= max_length else text[:max_length] + ""
# endregion
# region Base Class
class ToolCallFormatter(ABC):
"""Base class for tool call formatters that produce human-readable display strings
for function call content items shown in the console.
"""
@abstractmethod
def can_format(self, call: Content) -> bool:
"""Return True if this formatter can handle the given function call.
Args:
call: The function call content to check.
Returns:
True if this formatter should be used; otherwise False.
"""
...
@abstractmethod
def format_detail(self, call: Content) -> str | None:
"""Return the detail portion of the formatted output for the given tool call,
or None if only the tool name should be displayed.
Args:
call: The function call content to format.
Returns:
A detail string to append after the tool name, or None.
"""
...
# endregion
# region Concrete Formatters
class FallbackToolFormatter(ToolCallFormatter):
"""Catch-all formatter that handles any tool not matched by a more specific formatter.
Displays a generic summary of the tool's arguments. This formatter should always be
placed last in the formatter list.
"""
def can_format(self, call: Content) -> bool:
"""Always returns True - this formatter matches everything."""
return True
def format_detail(self, call: Content) -> str | None:
"""Format arguments as generic (key: value, ...) pairs."""
if call.arguments is None:
return None
# Parse arguments
if isinstance(call.arguments, str):
try:
args_dict = json.loads(call.arguments)
except (json.JSONDecodeError, TypeError):
return None
elif isinstance(call.arguments, dict):
args_dict = call.arguments
else:
return None
if not args_dict:
return None
# Build argument list
parts: list[str] = []
for key, value in args_dict.items():
if value is None:
continue
# Convert value to string
if isinstance(value, bool):
str_value = "true" if value else "false"
elif isinstance(value, (int, float)):
str_value = str(value)
elif isinstance(value, str):
str_value = value
else:
# Complex types - skip for now
continue
parts.append(f"{key}: {truncate(str_value, 40)}")
return f"({', '.join(parts)})" if parts else None
class WebSearchToolFormatter(ToolCallFormatter):
"""Formats web_search tool calls, showing the search query."""
def can_format(self, call: Content) -> bool:
"""Match web_search tool calls."""
return call.name == "web_search"
def format_detail(self, call: Content) -> str | None:
"""Extract and format the query parameter."""
value = get_argument_value(call, "query")
return f"({value})" if value else None
class TodoToolFormatter(ToolCallFormatter):
"""Formats todos_* tool calls with tree-view output for added items
and structured output for complete/remove operations.
"""
def can_format(self, call: Content) -> bool:
"""Match todos_* tool calls."""
return call.name is not None and call.name.startswith("todos_")
def format_detail(self, call: Content) -> str | None:
"""Format based on the specific todos operation."""
if call.name == "todos_add":
return self._format_add_todos(call)
if call.name == "todos_complete":
return self._format_complete_todos(call)
if call.name == "todos_remove":
return self._format_id_list(call, "ids", "Remove")
return None
def _format_add_todos(self, call: Content) -> str | None:
"""Format todos_add with tree view of titles."""
todos = as_dict_list(get_argument_value(call, "todos"))
if not todos:
return None
titles: list[str] = []
for todo in todos:
title = todo.get("title")
if title and isinstance(title, str):
titles.append(title)
if not titles:
return None
# Build tree view
count = len(titles)
plural = "s" if count != 1 else ""
lines = [f"({count} item{plural})"]
for i, title in enumerate(titles):
connector = "├─" if i < count - 1 else "└─"
lines.append(f"\n {connector} {title}")
return "".join(lines)
def _format_complete_todos(self, call: Content) -> str | None:
"""Format todos_complete with tree view of IDs and reasons."""
items = as_dict_list(get_argument_value(call, "items"))
if not items:
return None
entries: list[tuple[int, str | None]] = []
for item in items:
todo_id = item.get("id")
if not isinstance(todo_id, int):
continue
reason = item.get("reason")
reason_str = str(reason) if reason is not None and not isinstance(reason, str) else reason
entries.append((todo_id, reason_str))
if not entries:
return None
# Build tree view
lines: list[str] = []
for i, (todo_id, reason) in enumerate(entries):
connector = "├─" if i < len(entries) - 1 else "└─"
line = f"\n {connector} Complete #{todo_id}"
if reason:
line += f"{truncate(reason, 80)}"
lines.append(line)
return "".join(lines)
def _format_id_list(self, call: Content, param_name: str, verb: str) -> str | None:
"""Format a list of IDs with a verb (e.g., Remove #1, Remove #2)."""
ids = as_int_list(get_argument_value(call, param_name))
if not ids:
return None
lines: list[str] = []
for i, todo_id in enumerate(ids):
connector = "├─" if i < len(ids) - 1 else "└─"
lines.append(f"\n {connector} {verb} #{todo_id}")
return "".join(lines)
class ModeToolFormatter(ToolCallFormatter):
"""Formats AgentMode_* tool calls, showing the target mode for Set operations."""
def can_format(self, call: Content) -> bool:
"""Match AgentMode_* tool calls."""
return call.name is not None and call.name.startswith("AgentMode_")
def format_detail(self, call: Content) -> str | None:
"""Format based on the specific AgentMode operation."""
if call.name == "AgentMode_Set":
value = get_argument_value(call, "mode")
return f"({value})" if value else None
return None
class BackgroundAgentToolFormatter(ToolCallFormatter):
"""Formats BackgroundAgents_* tool calls with human-readable details
for task start, continue, wait, and result retrieval operations.
"""
def can_format(self, call: Content) -> bool:
"""Match BackgroundAgents_* tool calls."""
return call.name is not None and call.name.startswith("BackgroundAgents_")
def format_detail(self, call: Content) -> str | None:
"""Format based on the specific BackgroundAgents operation."""
if call.name == "BackgroundAgents_StartTask":
return self._format_start_background_task(call)
if call.name == "BackgroundAgents_WaitForFirstCompletion":
return self._format_id_list(call, "taskIds", "Wait for")
if call.name == "BackgroundAgents_GetTaskResults":
return self._format_single_id(call, "taskId")
if call.name == "BackgroundAgents_ContinueTask":
return self._format_continue_task(call)
if call.name == "BackgroundAgents_ClearCompletedTask":
return self._format_single_id(call, "taskId")
return None
def _format_start_background_task(self, call: Content) -> str | None:
"""Format StartTask with agent name and description."""
agent_name = get_argument_value(call, "agentName")
description = get_argument_value(call, "description")
if agent_name is None and description is None:
return None
lines: list[str] = []
if agent_name is not None and description is not None:
lines.append(f"\n ├─ Agent: {agent_name}")
lines.append(f'\n └─ "{truncate(description, 80)}"')
elif agent_name is not None:
lines.append(f"\n └─ Agent: {agent_name}")
else:
lines.append(f'\n └─ "{truncate(description, 80)}"') # type: ignore[arg-type]
return "".join(lines)
def _format_id_list(self, call: Content, param_name: str, verb: str) -> str | None:
"""Format a list of task IDs with a verb."""
ids = as_int_list(get_argument_value(call, param_name))
if not ids:
return None
lines: list[str] = []
for i, task_id in enumerate(ids):
connector = "├─" if i < len(ids) - 1 else "└─"
lines.append(f"\n {connector} {verb} #{task_id}")
return "".join(lines)
def _format_single_id(self, call: Content, param_name: str) -> str | None:
"""Format a single task ID in parentheses."""
task_id = get_argument_value(call, param_name)
if isinstance(task_id, int):
return f"(task #{task_id})"
return None
def _format_continue_task(self, call: Content) -> str | None:
"""Format ContinueTask with task ID and optional text."""
task_id = get_argument_value(call, "taskId")
text = get_argument_value(call, "text")
if not isinstance(task_id, int):
return None
if text:
lines = [
f"\n ├─ Task #{task_id}",
f'\n └─ "{truncate(text, 80)}"',
]
return "".join(lines)
return f"\n └─ Task #{task_id}"
class FileMemoryToolFormatter(ToolCallFormatter):
"""Formats FileMemory_* tool calls, showing file names and search patterns
with tree-view corners for save operations.
"""
def can_format(self, call: Content) -> bool:
"""Match FileMemory_* tool calls."""
return call.name is not None and call.name.startswith("FileMemory_")
def format_detail(self, call: Content) -> str | None:
"""Format based on the specific FileMemory operation."""
if call.name == "FileMemory_SaveFile":
return self._format_save_file(call)
if call.name in ("FileMemory_ReadFile", "FileMemory_DeleteFile"):
value = get_argument_value(call, "fileName")
return f"({value})" if value else None
if call.name == "FileMemory_SearchFiles":
return self._format_search_files(call)
return None
def _format_save_file(self, call: Content) -> str | None:
"""Format SaveFile with file name and description indicator."""
file_name = get_argument_value(call, "fileName")
description = get_argument_value(call, "description")
if not file_name:
return None
if description:
return f"\n └─ {file_name} (with description)"
return f"\n └─ {file_name}"
def _format_search_files(self, call: Content) -> str | None:
"""Format SearchFiles with regex pattern and optional file pattern."""
pattern = get_argument_value(call, "regexPattern")
file_pattern = get_argument_value(call, "filePattern")
if not pattern:
return None
if file_pattern:
return f"(/{pattern}/ in {file_pattern})"
return f"(/{pattern}/)"
# endregion
# region Public API Functions
def format_tool_call(formatters: list[ToolCallFormatter], call: Content) -> str:
"""Format a tool call using the first matching formatter from the provided list.
Returns "{toolName} {detail}" when a formatter produces detail,
or just "{toolName}" otherwise.
Args:
formatters: List of formatters to try in order.
call: The function call content to format.
Returns:
Formatted string representation of the tool call.
"""
for formatter in formatters:
if formatter.can_format(call):
detail = formatter.format_detail(call)
tool_name = call.name or "Unknown"
return f"{tool_name} {detail}" if detail is not None else tool_name
return call.name or "Unknown"
def build_default_formatters() -> list[ToolCallFormatter]:
"""Create the default list of tool call formatters.
The FallbackToolFormatter is always last. Users can call this function
and combine the result with their own formatters.
Returns:
A list of all built-in tool call formatters.
"""
return [
TodoToolFormatter(),
ModeToolFormatter(),
BackgroundAgentToolFormatter(),
FileMemoryToolFormatter(),
WebSearchToolFormatter(),
FallbackToolFormatter(),
]
# endregion
@@ -0,0 +1,87 @@
# Copyright (c) Microsoft. All rights reserved.
"""Main entry point for the harness console.
Provides the top-level run_agent_async() function that creates and runs
the Textual-based harness console application.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from .app import HarnessApp
from .observers import build_default_observers
if TYPE_CHECKING:
from agent_framework import Agent, AgentSession
from .commands import CommandHandler
from .observers.base import ConsoleObserver
async def run_agent_async(
agent: Agent,
*,
session: AgentSession | None = None,
observers: list[ConsoleObserver] | None = None,
command_handlers: list[CommandHandler] | None = None,
mode_colors: dict[str, str] | None = None,
initial_mode: str | None = None,
placeholder: str = "Type a message and press Enter...",
title: str = "Harness Console",
max_context_window_tokens: int | None = None,
max_output_tokens: int | None = None,
) -> None:
"""Run the harness console with the given agent.
This is the main entry point for the harness console. Creates a Textual
application with the configured observers and runs it until the user exits.
Args:
agent: The agent to run conversations with.
session: Optional agent session for conversation history.
observers: List of console observers. If None, uses defaults.
command_handlers: List of command handlers. If None, auto-detected from agent.
mode_colors: Mapping of mode names to Rich color strings.
initial_mode: Initial agent mode text.
placeholder: Input placeholder text.
title: Application title.
max_context_window_tokens: Optional max context window size for usage display.
max_output_tokens: Optional max output tokens for usage display.
Example:
.. code-block:: python
from agent_framework import Agent
from agent_framework.openai import OpenAIChatClient
from console import run_agent_async
agent = Agent(
client=OpenAIChatClient(),
instructions="You are helpful.",
)
await run_agent_async(agent)
"""
resolved_observers = observers or build_default_observers()
resolved_mode_colors = mode_colors or {
"plan": "cyan",
"execute": "green",
}
resolved_session = session or agent.create_session()
app = HarnessApp(
agent=agent,
observers=resolved_observers,
session=resolved_session,
mode_colors=resolved_mode_colors,
initial_mode=initial_mode,
placeholder=placeholder,
title=title,
max_context_window_tokens=max_context_window_tokens,
max_output_tokens=max_output_tokens,
command_handlers=command_handlers,
)
await app.run_async()
@@ -0,0 +1,122 @@
# Copyright (c) Microsoft. All rights reserved.
"""Console observers for agent streaming lifecycle.
This module provides observers that display events during agent streaming
and collect follow-up actions. All observers use the IUXStateDriver interface
to update the UI.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from .base import ConsoleObserver
from .error_display import ErrorDisplayObserver
from .planning_output import PlanningOutputObserver
from .reasoning_display import ReasoningDisplayObserver
from .text_output import TextOutputObserver
from .tool_approval import ToolApprovalObserver
from .tool_call_display import ToolCallDisplayObserver
from .usage_display import UsageDisplayObserver
if TYPE_CHECKING:
from agent_framework import Agent
def build_default_observers() -> list[ConsoleObserver]:
"""Build the default set of observers for the harness console.
Returns a standard observer list covering:
- Text output (streaming text display)
- Tool call display (formatted tool invocations)
- Error display (error messages)
- Usage display (token counts)
- Reasoning display (reasoning/thinking blocks)
- Tool approval (user approval for tool calls)
Note: PlanningOutputObserver is NOT included here because it requires
a mode_provider. Use build_observers_with_planning() for agents that
have an AgentModeProvider (i.e. agents created with create_harness_agent).
Returns:
List of default console observers.
"""
return [
TextOutputObserver(),
ToolCallDisplayObserver(),
ErrorDisplayObserver(),
UsageDisplayObserver(),
ReasoningDisplayObserver(),
ToolApprovalObserver(),
]
def build_observers_with_planning(
agent: Agent,
plan_mode_name: str = "plan",
execution_mode_name: str = "execute",
*,
mode_colors: dict[str, str] | None = None,
) -> list[ConsoleObserver]:
"""Build observers with planning support (structured output in plan mode).
Replaces TextOutputObserver with PlanningOutputObserver, which configures
structured JSON output via response_format when in plan mode. This enables
the list picker UI for clarification and approval questions.
Requires that the agent has an AgentModeProvider in its context_providers
(automatically added by create_harness_agent).
Args:
agent: The agent to resolve the AgentModeProvider from.
plan_mode_name: The mode name that represents planning mode.
execution_mode_name: The mode name to switch to on approval.
mode_colors: Optional mapping of mode names to Rich color strings.
Returns:
List of observers with planning support.
Raises:
ValueError: If the agent has no AgentModeProvider.
"""
from agent_framework import AgentModeProvider
mode_provider = next(
(p for p in agent.context_providers if isinstance(p, AgentModeProvider)),
None,
)
if mode_provider is None:
msg = (
"Planning observers require an AgentModeProvider on the agent. "
"Use create_harness_agent() or add AgentModeProvider to context_providers."
)
raise ValueError(msg)
return [
ToolCallDisplayObserver(),
ToolApprovalObserver(),
ErrorDisplayObserver(),
ReasoningDisplayObserver(),
UsageDisplayObserver(),
PlanningOutputObserver(
mode_provider,
plan_mode_name,
execution_mode_name,
mode_colors=mode_colors,
),
]
__all__ = [
"ConsoleObserver",
"ErrorDisplayObserver",
"PlanningOutputObserver",
"ReasoningDisplayObserver",
"TextOutputObserver",
"ToolApprovalObserver",
"ToolCallDisplayObserver",
"UsageDisplayObserver",
"build_default_observers",
"build_observers_with_planning",
]
@@ -0,0 +1,125 @@
# Copyright (c) Microsoft. All rights reserved.
"""Base class for console observers.
Observers participate in the agent streaming lifecycle, displaying events
and optionally returning follow-up actions.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from agent_framework import Agent, Content, Message
from ..app_state import FollowUpAction
from ..state_driver import IUXStateDriver
class ConsoleObserver:
"""Base class for console observers.
Observers participate in the agent streaming lifecycle, displaying
events (tool calls, errors, reasoning, etc.) and optionally returning
follow-up actions (questions, approval requests).
All methods have default no-op implementations, so subclasses only
override the methods they need.
"""
def configure_run_options(
self,
options: dict[str, Any],
agent: Agent,
session: Any,
) -> None:
"""Configure run options before agent invocation.
Override to set options such as response_format, max_tokens, etc.
Args:
options: Dictionary of chat options to modify.
agent: The AI agent.
session: The agent session.
"""
pass
async def on_response_update(
self,
ux: IUXStateDriver,
update: Message,
agent: Agent,
session: Any,
) -> None:
"""Called for each response update chunk.
Override to inspect update-level metadata or handle provider-specific
events in the raw representation.
Args:
ux: The UX state driver for UI updates.
update: The message update chunk.
agent: The AI agent.
session: The agent session.
"""
pass
async def on_content(
self,
ux: IUXStateDriver,
content: Content,
agent: Agent,
session: Any,
) -> None:
"""Called for each content item in the response.
Override to handle specific content types (function calls, errors, etc.).
Args:
ux: The UX state driver for UI updates.
content: The content item from the response.
agent: The AI agent.
session: The agent session.
"""
pass
async def on_text(
self,
ux: IUXStateDriver,
text: str,
agent: Agent,
session: Any,
) -> None:
"""Called for each text chunk in the response.
Override to accumulate and display streaming text.
Args:
ux: The UX state driver for UI updates.
text: The text chunk.
agent: The AI agent.
session: The agent session.
"""
pass
async def on_stream_complete(
self,
ux: IUXStateDriver,
agent: Agent,
session: Any,
) -> list[FollowUpAction] | None:
"""Called when streaming completes.
Override to return follow-up actions (questions to ask the user,
messages to inject into the next turn, etc.).
Args:
ux: The UX state driver for UI updates.
agent: The AI agent.
session: The agent session.
Returns:
Optional list of follow-up actions to queue, or None.
"""
return None
@@ -0,0 +1,72 @@
# Copyright (c) Microsoft. All rights reserved.
"""Error display observer for showing errors."""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from .base import ConsoleObserver
if TYPE_CHECKING:
from agent_framework import Agent, Content
from ..state_driver import IUXStateDriver
class ErrorDisplayObserver(ConsoleObserver):
"""Displays error content from the agent response.
Shows errors with an ❌ prefix in red to make them easily visible.
"""
async def on_content(
self,
ux: IUXStateDriver,
content: Content,
agent: Agent,
session: Any,
) -> None:
"""Display error content.
Args:
ux: The UX state driver for UI updates.
content: The content item to check for errors.
agent: The AI agent.
session: The agent session.
"""
# Check if this is an error content type
# The exact content type check depends on the agent framework's Content class
if hasattr(content, "type") and content.type == "error":
error_text = self._format_error(content)
ux.append_info_line(error_text, "red")
elif getattr(content, "error", None):
error_text = f"❌ Error: {content.error}" # type: ignore[reportAttributeAccessIssue]
ux.append_info_line(error_text, "red")
def _format_error(self, content: Content) -> str:
"""Format error content for display.
Args:
content: The error content.
Returns:
Formatted error string.
"""
error_text = "❌ Error"
# Try to extract error message
if hasattr(content, "message"):
error_text += f": {content.message}"
elif hasattr(content, "text"):
error_text += f": {content.text}"
# Try to add error code if available
if hasattr(content, "error_code") and content.error_code:
error_text += f" (code: {content.error_code})"
# Try to add details if available
if hasattr(content, "details") and getattr(content, "details", None):
error_text += f"{content.details}" # type: ignore[reportAttributeAccessIssue]
return error_text
@@ -0,0 +1,71 @@
# Copyright (c) Microsoft. All rights reserved.
"""Pydantic models for structured planning output.
These models define the JSON schema that the agent produces when in planning
mode via `response_format`. The schema enables consistent rendering of
clarification questions and approval requests in the console UI.
"""
from __future__ import annotations
from enum import Enum
from pydantic import BaseModel, Field
class PlanningResponseType(str, Enum):
"""Type of planning response from the agent."""
CLARIFICATION = "clarification"
"""The agent needs clarification and presents options for the user to choose from."""
APPROVAL = "approval"
"""The agent is seeking approval to proceed with execution."""
class PlanningQuestion(BaseModel):
"""A single question or item within a PlanningResponse.
For clarification: contains the question text and optional choices.
For approval: contains the plan summary for the user to approve.
"""
message: str = Field(
description=(
"For clarifications, this has the question that needs to be clarified "
"with the user. For approvals, this would contain a summary of the "
"execution plan that the user needs to approve."
),
)
choices: list[str] | None = Field(
default=None,
description=(
"For clarifications, this has a list of options that the user can "
"choose from. null for approvals."
),
)
class PlanningResponse(BaseModel):
"""Structured response from the agent while in planning mode.
Used with structured output (`response_format`) to enable consistent
rendering of clarification questions and approval requests.
"""
type: PlanningResponseType = Field(
description=(
"Use 'clarification' when you need clarification around the user "
"request and you want to present the user with options to choose from. "
"Use 'approval' when you are ready to start execution, but need "
"approval to start executing."
),
)
questions: list[PlanningQuestion] = Field(
description=(
"For clarifications, this has one or more questions to ask the user "
"(each with choices). For approvals, this has exactly one item "
"containing the plan summary for the user to approve."
),
)
@@ -0,0 +1,242 @@
# Copyright (c) Microsoft. All rights reserved.
"""Planning output observer for structured agent responses in plan mode.
In planning mode, this observer configures structured JSON output via
response_format, collects streamed text silently, then deserializes the
result as a PlanningResponse to present clarification/approval questions.
In execution mode, text is streamed through directly.
"""
from __future__ import annotations
import json
from typing import TYPE_CHECKING, Any
from rich.markup import escape
from ..app_state import (
ChoiceFollowUpQuestion,
FollowUpAction,
TextFollowUpQuestion,
)
from .base import ConsoleObserver
from .planning_models import PlanningResponse, PlanningResponseType
if TYPE_CHECKING:
from agent_framework import Agent, AgentModeProvider, Message
from ..state_driver import IUXStateDriver
class PlanningOutputObserver(ConsoleObserver):
"""Mode-aware observer that uses structured output in plan mode.
In planning mode:
- Configures response_format to PlanningResponse schema
- Collects streamed text silently
- Deserializes JSON into PlanningResponse
- Builds follow-up questions (clarification or approval)
In execution mode:
- Streams text directly to the UX driver
If JSON parsing fails, falls back to rendering the raw text as regular
output so the user always sees what the agent produced.
"""
def __init__(
self,
mode_provider: AgentModeProvider,
plan_mode_name: str,
execution_mode_name: str,
*,
mode_colors: dict[str, str] | None = None,
) -> None:
"""Initialize the planning output observer.
Args:
mode_provider: The mode provider for reading/switching modes.
plan_mode_name: The mode name that represents planning mode.
execution_mode_name: The mode name to switch to on approval.
mode_colors: Optional mapping of mode names to Rich color strings.
"""
self._mode_provider = mode_provider
self._plan_mode_name = plan_mode_name
self._execution_mode_name = execution_mode_name
self._mode_colors = mode_colors or {}
self._text_collector: list[str] = []
def configure_run_options(
self,
options: dict[str, Any],
agent: Agent,
session: Any,
) -> None:
"""Set response_format to PlanningResponse when in plan mode."""
if self._is_planning_mode(session):
options["response_format"] = PlanningResponse
async def on_text(
self,
ux: IUXStateDriver,
text: str,
agent: Agent,
session: Any,
) -> None:
"""Collect text in plan mode; stream through in execute mode."""
if self._is_planning_mode_from_ux(ux):
self._text_collector.append(text)
else:
ux.write_text(escape(text))
async def on_stream_complete(
self,
ux: IUXStateDriver,
agent: Agent,
session: Any,
) -> list[FollowUpAction] | None:
"""Parse collected text as PlanningResponse and build follow-up actions."""
if not self._is_planning_mode_from_ux(ux):
self._text_collector.clear()
return None
collected_text = "".join(self._text_collector)
self._text_collector.clear()
if not collected_text.strip():
return None
# Attempt to deserialize structured response
try:
planning_response = PlanningResponse.model_validate_json(collected_text)
except (json.JSONDecodeError, ValueError):
# JSON parsing failed — fall back to rendering as regular text
ux.write_text(escape(collected_text))
return None
if planning_response.type == PlanningResponseType.CLARIFICATION:
return self._build_clarification_actions(planning_response)
if planning_response.type == PlanningResponseType.APPROVAL:
if not planning_response.questions:
ux.append_info_line("(approval response had no content)", "yellow")
return None
question = planning_response.questions[0]
return [self._build_approval_action(question, session)]
# Unexpected type — fall back to rendering as regular text
ux.write_text(escape(collected_text))
return None
def _is_planning_mode(self, session: Any) -> bool:
"""Check if session is in planning mode."""
from agent_framework import get_agent_mode
try:
current_mode = get_agent_mode(session)
except (AttributeError, TypeError):
return True # No mode provider → treat as planning
return current_mode.lower() == self._plan_mode_name.lower()
def _is_planning_mode_from_ux(self, ux: IUXStateDriver) -> bool:
"""Check if UX is in planning mode."""
current = ux.current_mode
if current is None:
return True
return current.lower() == self._plan_mode_name.lower()
def _build_clarification_actions(
self,
response: PlanningResponse,
) -> list[FollowUpAction]:
"""Build follow-up questions for clarification."""
actions: list[FollowUpAction] = []
for question in response.questions:
prompt = question.message
cont = self._make_clarification_continuation(prompt)
if question.choices and len(question.choices) > 0:
actions.append(
ChoiceFollowUpQuestion(
prompt=prompt,
choices=question.choices,
allow_custom_text=True,
continuation=cont,
)
)
else:
actions.append(
TextFollowUpQuestion(
prompt=prompt,
continuation=cont,
)
)
return actions
@staticmethod
def _make_clarification_continuation(prompt: str):
"""Create a clarification continuation closure capturing the prompt."""
async def continuation(
answer: str,
ux: IUXStateDriver,
) -> Message | None:
if not answer.strip():
ux.append_info_line(f"🔹 {prompt}\n └─ (no answer)", "dim")
return None
ux.append_info_line(f"🔹 {prompt}\n └─ [green]{answer}[/green]", "dim")
from agent_framework import Message
return Message(role="user", contents=[f"Q: {prompt}\nA: {answer}"])
return continuation
def _build_approval_action(
self,
question: Any,
session: Any,
) -> ChoiceFollowUpQuestion:
"""Build the approval follow-up question."""
approve_option = "Approve and switch to execute mode"
prompt = question.message
async def continuation(
selection: str,
ux: IUXStateDriver,
) -> Message | None:
ux.append_info_line(
f"🔹 {prompt}\n └─ [green]{selection}[/green]",
"dim",
)
if selection == approve_option:
from agent_framework import set_agent_mode
set_agent_mode(session, self._execution_mode_name)
exec_color = self._mode_colors.get(self._execution_mode_name)
ux.set_mode(self._execution_mode_name, exec_color)
ux.append_info_line(
f"✅ Switched to {self._execution_mode_name} mode.",
exec_color,
)
from agent_framework import Message
return Message(role="user", contents=["Approved"])
# Custom freeform input — treat as suggested changes
from agent_framework import Message
return Message(role="user", contents=[selection])
return ChoiceFollowUpQuestion(
prompt=prompt,
choices=[approve_option],
allow_custom_text=True,
continuation=continuation,
)
@@ -0,0 +1,80 @@
# Copyright (c) Microsoft. All rights reserved.
"""Reasoning display observer for showing thinking content."""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from rich.markup import escape
from .base import ConsoleObserver
if TYPE_CHECKING:
from agent_framework import Agent, Content
from ..state_driver import IUXStateDriver
class ReasoningDisplayObserver(ConsoleObserver):
"""Displays reasoning/thinking content from the agent.
Some models (like o1) provide reasoning steps that show their
internal thought process. This observer displays them with a 💭 prefix
in a dimmed style.
"""
async def on_content(
self,
ux: IUXStateDriver,
content: Content,
agent: Agent,
session: Any,
) -> None:
"""Display reasoning content.
Args:
ux: The UX state driver for UI updates.
content: The content item to check for reasoning.
agent: The AI agent.
session: The agent session.
"""
reasoning_text = self._extract_reasoning(content)
if reasoning_text:
# Display reasoning in dim style to differentiate from main output
ux.append_info_line(f"💭 {escape(reasoning_text)}", "dim")
def _extract_reasoning(self, content: Content) -> str | None:
"""Extract reasoning text from content.
Args:
content: The content item to extract reasoning from.
Returns:
The reasoning text, or None if no reasoning is present.
"""
# Check for reasoning content type
if hasattr(content, "type") and content.type in {"text_reasoning", "reasoning"}:
if hasattr(content, "text"):
return content.text
content_attr = getattr(content, "content", None)
if content_attr:
return str(content_attr)
# Check for reasoning attribute
reasoning = getattr(content, "reasoning", None)
if reasoning is not None:
if isinstance(reasoning, str):
return reasoning
if hasattr(reasoning, "text"):
return reasoning.text
# Check for thinking attribute (alternative name)
thinking = getattr(content, "thinking", None)
if thinking is not None:
if isinstance(thinking, str):
return thinking
if hasattr(thinking, "text"):
return thinking.text
return None
@@ -0,0 +1,59 @@
# Copyright (c) Microsoft. All rights reserved.
"""Text output observer for streaming agent text."""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from rich.markup import escape
from .base import ConsoleObserver
if TYPE_CHECKING:
from agent_framework import Agent
from ..state_driver import IUXStateDriver
class TextOutputObserver(ConsoleObserver):
"""Displays streaming text output from the agent.
Writes text chunks incrementally to the UX state driver as they arrive,
allowing real-time display during streaming.
"""
async def on_text(
self,
ux: IUXStateDriver,
text: str,
agent: Agent,
session: Any,
) -> None:
"""Write each text chunk directly to the UX driver.
Args:
ux: The UX state driver for UI updates.
text: The text chunk to display.
agent: The AI agent.
session: The agent session.
"""
ux.write_text(escape(text))
async def on_stream_complete(
self,
ux: IUXStateDriver,
agent: Agent,
session: Any,
) -> list | None:
"""No-op on stream complete (state managed by UX driver).
Args:
ux: The UX state driver for UI updates.
agent: The AI agent.
session: The agent session.
Returns:
None (no follow-up actions).
"""
return None
@@ -0,0 +1,139 @@
# Copyright (c) Microsoft. All rights reserved.
"""Tool approval observer for user confirmation of tool calls.
Detects function_approval_request content items during streaming, displays
approval notifications, and after the stream completes presents one
ChoiceFollowUpQuestion per pending approval request.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from ..app_state import ChoiceFollowUpQuestion, FollowUpAction
from .base import ConsoleObserver
if TYPE_CHECKING:
from agent_framework import Agent, Content, Message
from ..state_driver import IUXStateDriver
class ToolApprovalObserver(ConsoleObserver):
"""Asks user to approve tool calls before execution.
Collects `function_approval_request` content during streaming and presents
a multi-choice approval question for each after the stream completes.
The continuation builds a `function_approval_response` Content to inject
into the next agent turn.
"""
def __init__(self) -> None:
"""Initialize the tool approval observer."""
self._approval_requests: list[Content] = []
async def on_content(
self,
ux: IUXStateDriver,
content: Content,
agent: Agent,
session: Any,
) -> None:
"""Collect function_approval_request content for approval.
Args:
ux: The UX state driver for UI updates.
content: The content item to check.
agent: The AI agent.
session: The agent session.
"""
if content.type == "function_approval_request":
self._approval_requests.append(content)
tool_name = self._format_tool_name(content)
ux.append_info_line(f"⚠️ Approval needed: {tool_name}", "yellow")
async def on_stream_complete(
self,
ux: IUXStateDriver,
agent: Agent,
session: Any,
) -> list[FollowUpAction] | None:
"""Build approval questions for collected requests.
Args:
ux: The UX state driver for UI updates.
agent: The AI agent.
session: The agent session.
Returns:
List of ChoiceFollowUpQuestions, one per approval request.
"""
if not self._approval_requests:
return None
actions: list[FollowUpAction] = []
for request in self._approval_requests:
actions.append(self._build_approval_question(request))
self._approval_requests.clear()
return actions
def _build_approval_question(self, request: Content) -> ChoiceFollowUpQuestion:
"""Build a multi-choice approval question for a single request."""
tool_name = self._format_tool_name(request)
prompt = f"🔐 Tool approval: {tool_name}"
# TODO(westey-m): Add "Always approve" options when the framework supports
# CreateAlwaysApproveToolResponse / CreateAlwaysApproveToolWithArgumentsResponse.
choices = [
"Approve this call",
"Deny",
]
async def continuation(
selection: str,
ux: IUXStateDriver,
) -> Message | None:
from agent_framework import Message
if selection == "Deny":
response_content = request.to_function_approval_response(approved=False)
action_label = "❌ Denied"
color = "red"
else:
response_content = request.to_function_approval_response(approved=True)
action_label = "✅ Approved"
color = "green"
ux.append_info_line(
f"🔹 {prompt}\n └─ [{color}]{action_label}[/{color}]",
"dim",
)
return Message(role="user", contents=[response_content])
return ChoiceFollowUpQuestion(
prompt=prompt,
choices=choices,
allow_custom_text=False,
continuation=continuation,
)
@staticmethod
def _format_tool_name(content: Content) -> str:
"""Extract a readable tool name from approval request content."""
# The function_call is stored on the approval request content
function_call = getattr(content, "function_call", None)
if function_call is not None:
from ..formatters import build_default_formatters, format_tool_call
try:
return format_tool_call(build_default_formatters(), function_call)
except (AttributeError, TypeError):
pass
# Fall back to name attribute
name = getattr(function_call, "name", None)
if name:
return str(name)
return "unknown tool"
@@ -0,0 +1,53 @@
# Copyright (c) Microsoft. All rights reserved.
"""Tool call display observer using formatters."""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from ..formatters import build_default_formatters, format_tool_call
from .base import ConsoleObserver
if TYPE_CHECKING:
from agent_framework import Agent, Content
from ..formatters import ToolCallFormatter
from ..state_driver import IUXStateDriver
class ToolCallDisplayObserver(ConsoleObserver):
"""Displays tool call notifications using formatters.
Shows tool calls with a 🔧 prefix and uses the formatter system to
display them in a user-friendly format.
"""
def __init__(self, formatters: list[ToolCallFormatter] | None = None) -> None:
"""Initialize the tool call display observer.
Args:
formatters: Optional list of tool formatters. If None, uses
default formatters from build_default_formatters().
"""
self._formatters = formatters or build_default_formatters()
async def on_content(
self,
ux: IUXStateDriver,
content: Content,
agent: Agent,
session: Any,
) -> None:
"""Display function call content.
Args:
ux: The UX state driver for UI updates.
content: The content item to check for function calls.
agent: The AI agent.
session: The agent session.
"""
# Check if this is a function call content type
if content.type == "function_call":
formatted = format_tool_call(self._formatters, content)
ux.append_info_line(f"🔧 {formatted}", "yellow")
@@ -0,0 +1,56 @@
# Copyright (c) Microsoft. All rights reserved.
"""Usage display observer for token usage statistics."""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from .base import ConsoleObserver
if TYPE_CHECKING:
from agent_framework import Agent
from ..state_driver import IUXStateDriver
class UsageDisplayObserver(ConsoleObserver):
"""Displays token usage as a proportion of the context window.
Shows current token usage as reported by the API immediately when
usage information becomes available (via Content items or the final response).
The display shows input/output/total relative to configured budgets.
"""
async def on_content(
self,
ux: IUXStateDriver,
content: Any,
agent: Agent,
session: Any,
) -> None:
"""Update usage display immediately when usage content arrives.
Args:
ux: The UX state driver for UI updates.
content: A content item from the response.
agent: The AI agent.
session: The agent session.
"""
if getattr(content, "type", None) == "usage":
usage_details = getattr(content, "usage_details", None)
if isinstance(usage_details, dict):
# Pass through to state driver — the runner handles formatting
ux.set_usage_text(self._format_from_details(usage_details))
@staticmethod
def _format_from_details(usage: dict) -> str:
"""Format usage details dict into display text.
This is a fallback formatter for when usage arrives as Content
before the runner's final response processing.
"""
input_tokens = usage.get("input_token_count", 0) or 0
output_tokens = usage.get("output_token_count", 0) or 0
total_tokens = usage.get("total_token_count", 0) or input_tokens + output_tokens
return f"📊 Tokens — input: {input_tokens:,} | output: {output_tokens:,} | total: {total_tokens:,}"
@@ -0,0 +1,338 @@
# Copyright (c) Microsoft. All rights reserved.
"""State driver interface for UI updates.
This module defines the IUXStateDriver Protocol, which observers use to
update the UI during agent streaming. This is an interface-only definition;
the concrete implementation will be in a separate module.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Protocol
if TYPE_CHECKING:
from agent_framework import AgentSession
from .app_state import FollowUpAction
class IUXStateDriver(Protocol):
"""Protocol for UI state driver.
Observers call these methods to update the UI during agent streaming.
This is an interface-only definition - concrete implementation comes later.
The state driver acts as a controller between the agent framework (model)
and the Textual UI components (view), coordinating all UI updates.
"""
def append_info_line(self, text: str, color: str | None = None) -> None:
"""Append an informational line to the output.
Used for displaying tool calls, errors, warnings, and other
informational messages that aren't part of the agent's text response.
Args:
text: The text to display.
color: Optional Rich color string (e.g., "yellow", "red", "dim").
"""
...
def append_stream_footer(self, text: str) -> None:
"""Append a footer line after streaming ends.
Used for displaying final status messages like "(no text response)"
or other closing information.
Args:
text: The footer text to display.
"""
...
def begin_streaming(self) -> None:
"""Begin streaming mode.
Switches the bottom panel to streaming mode (shows "Streaming..." indicator),
starts the spinner animation, and prepares for streaming text updates.
"""
...
def update_streaming_text(self, accumulated_text: str) -> None:
"""Update the accumulated streaming text.
Called repeatedly during streaming to update the displayed text as
new chunks arrive from the agent. The text should accumulate across
multiple calls.
Args:
accumulated_text: The full accumulated text so far.
"""
...
def write_text(self, text: str, color: str | None = None) -> None:
"""Write a streaming text chunk incrementally.
Appends the text to the current streaming entry. If the streaming
entry is no longer the last output item (e.g., an info_line was
inserted), creates a new streaming entry.
Args:
text: The text chunk to append.
color: Optional Rich color string.
"""
...
def end_streaming(self) -> None:
"""End streaming mode.
Stops the spinner, switches the bottom panel back to text input mode,
and finalizes the streaming output.
"""
...
def enqueue_follow_up_action(self, action: FollowUpAction) -> None:
"""Add a follow-up action to the queue.
Follow-up actions can be questions to ask the user or messages to
inject into the next agent turn. The state driver queues these and
processes them after streaming completes.
Args:
action: The follow-up action to queue.
"""
...
def has_pending_questions(self) -> bool:
"""Check if there are pending follow-up questions awaiting user answers.
Returns:
True if there are unanswered questions in the queue.
"""
...
def take_follow_up_responses(self) -> list:
"""Take and clear all accumulated follow-up response messages.
Returns:
List of Message objects accumulated from follow-up actions.
"""
...
async def write_no_text_warning(self, has_follow_up_actions: bool) -> None:
"""Write a warning if the agent produced no text output.
Called after streaming completes. If no text was received and no
follow-up actions exist, writes a "(no text response)" footer.
Args:
has_follow_up_actions: Whether follow-up actions exist.
"""
...
def set_mode(self, mode: str | None, mode_color: str | None = None) -> None:
"""Set the current agent mode.
Updates the mode indicator in the UI (e.g., "[plan]", "[execute]")
with the specified color.
Args:
mode: The mode name (e.g., "plan", "execute"), or None to hide.
mode_color: Optional Rich color string for the mode label.
"""
...
def set_show_spinner(self, show: bool) -> None:
"""Show or hide the spinner animation.
The spinner provides visual feedback that the agent is processing.
Args:
show: True to show the spinner, False to hide it.
"""
...
def set_usage_text(self, usage_text: str | None) -> None:
"""Set the token usage text.
Displays token usage statistics (e.g., "1.2K in / 856 out") in
the status bar.
Args:
usage_text: The formatted usage text, or None to hide.
"""
...
@property
def current_mode(self) -> str | None:
"""Get the current agent mode.
Returns:
The current mode name, or None if no mode is set.
"""
...
def begin_streaming_output(self) -> None:
"""Reset per-turn streaming bookkeeping.
Called at the start of each agent turn to reset streaming state
(e.g., clear accumulated text, reset flags).
"""
...
def write_user_input_echo(self, text: str) -> None:
"""Echo user input to the output area.
Displays the user's submitted input in the conversation history,
typically with a "You: " prefix.
Args:
text: The user's input text.
"""
...
def request_shutdown(self) -> None:
"""Request the application to shut down.
Called by the /exit command handler to signal that the user
wants to quit the console.
"""
...
def replace_session(self, session: AgentSession) -> None:
"""Replace the current agent session.
Called by the /session-import command handler to swap the
active session with one loaded from a file.
Args:
session: The new session to use.
"""
...
class SimpleConsoleStateDriver:
"""Simple console-based state driver for testing.
This is a minimal implementation that logs all operations to the console.
Useful for testing the agent runner without a full UI.
"""
def __init__(self) -> None:
"""Initialize the simple state driver."""
self._streaming = False
self._spinner_visible = False
self._current_mode: str | None = None
print("[SimpleConsoleStateDriver initialized]")
def append_info_line(self, text: str, color: str | None = None) -> None:
"""Append an informational line to the output."""
color_prefix = f"[{color}]" if color else ""
print(f"{color_prefix} {text}")
def append_stream_footer(self, text: str) -> None:
"""Append a footer line after streaming ends."""
print(f"[Footer] {text}")
async def write_info_line(self, text: str, color: str | None = None) -> None:
"""Async version of append_info_line."""
self.append_info_line(text, color)
def write_user_input_echo(self, text: str) -> None:
"""Echo user input to the output."""
print(f"\n[User] {text}\n")
def begin_streaming(self) -> None:
"""Begin streaming mode."""
self._streaming = True
print("[▶ Streaming started]")
def begin_streaming_output(self) -> None:
"""Begin streaming output to the scroll panel."""
print("[▶ Streaming output started]")
def update_streaming_text(self, text: str) -> None:
"""Update the currently streaming text."""
# Truncate for readability
display_text = text[:80] + "..." if len(text) > 80 else text
print(f"[Assistant] {display_text}", end="", flush=True)
def write_text(self, text: str, color: str | None = None) -> None:
"""Write a streaming text chunk."""
print(text, end="", flush=True)
async def end_streaming_output(self) -> None:
"""End streaming output."""
print("\n[▪ Streaming output ended]")
def end_streaming(self) -> None:
"""End streaming mode."""
self._streaming = False
print("[▪ Streaming ended]")
def set_show_spinner(self, show: bool) -> None:
"""Show or hide the spinner."""
self._spinner_visible = show
status = "visible" if show else "hidden"
print(f"[Spinner: {status}]")
def set_mode(self, mode: str | None, mode_color: str | None = None) -> None:
"""Set the current mode text."""
self._current_mode = mode
color_str = f" ({mode_color})" if mode_color else ""
print(f"[Mode: {mode or 'default'}{color_str}]")
@property
def current_mode(self) -> str | None:
"""Get the current agent mode."""
return self._current_mode
def set_usage_text(self, usage_text: str | None) -> None:
"""Set the usage display text."""
if usage_text:
print(f"[Usage: {usage_text}]")
def enqueue_follow_up_action(self, action) -> None:
"""Enqueue a follow-up action.
Args:
action: The follow-up action to enqueue.
"""
action_type = type(action).__name__
print(f"[Follow-up queued: {action_type}]")
def has_pending_questions(self) -> bool:
"""Check if there are pending follow-up questions."""
return False
def take_follow_up_responses(self) -> list:
"""Take and clear all accumulated follow-up responses."""
return []
async def write_no_text_warning(self, has_follow_up_actions: bool) -> None:
"""Write a warning if no text was produced."""
if not has_follow_up_actions:
print("[▪ (no text response from agent)]")
def update_last_entry(self, entry_type, new_text: str) -> None:
"""Update the last output entry (placeholder for now).
Args:
entry_type: The type of entry to update.
new_text: The new text content.
"""
# Simplified: just print the update
display_text = new_text[:80] + "..." if len(new_text) > 80 else new_text
print(f"[Update last entry: {display_text}]", flush=True)
def request_shutdown(self) -> None:
"""Request application shutdown."""
print("[Shutdown requested]")
def replace_session(self, session) -> None:
"""Replace the active session.
Args:
session: The new session to use.
"""
print(f"[Session replaced: {getattr(session, 'id', 'unknown')}]")
@@ -0,0 +1,400 @@
# Copyright (c) Microsoft. All rights reserved.
"""Textual-based UX state driver implementation.
This module provides the full HarnessConsoleUXStateDriver that connects
the agent runner and observers to the Textual UI components. It mutates
the application state and triggers UI updates through the Textual app.
"""
from __future__ import annotations
from collections.abc import Callable
from typing import TYPE_CHECKING
from .app_state import (
BottomPanelMode,
ChoiceFollowUpQuestion,
FollowUpAction,
FollowUpMessage,
FollowUpQuestion,
HarnessAppState,
OutputEntry,
OutputEntryType,
)
if TYPE_CHECKING:
from agent_framework import Message
# Default mode colors (mode name -> Rich color string)
DEFAULT_MODE_COLORS: dict[str, str] = {
"plan": "cyan",
"execute": "green",
"review": "yellow",
"default": "blue",
}
def get_mode_color(mode: str | None, mode_colors: dict[str, str] | None = None) -> str:
"""Get the color for a mode name.
Args:
mode: The mode name.
mode_colors: Optional custom mode color mapping.
Returns:
A Rich color string for the mode.
"""
colors = mode_colors or DEFAULT_MODE_COLORS
if mode is None:
return colors.get("default", "blue")
return colors.get(mode, colors.get("default", "blue"))
class HarnessConsoleUXStateDriver:
"""Full Textual-based UX state driver.
Implements the IUXStateDriver protocol by mutating application state
and calling back into the Textual app to trigger UI updates.
The driver owns the output entry list and streaming state, and produces
state snapshots that the app uses to render the UI.
"""
def __init__(
self,
app_state: HarnessAppState,
on_state_changed: Callable[[], None],
mode_colors: dict[str, str] | None = None,
) -> None:
"""Initialize the state driver.
Args:
app_state: The application state object to mutate.
on_state_changed: Callback invoked after state changes to trigger UI refresh.
mode_colors: Optional mapping of mode names to Rich color strings.
"""
self._state = app_state
self._on_state_changed = on_state_changed
self._mode_colors = mode_colors
# Streaming bookkeeping
self._has_received_any_text = False
self._current_streaming_entry: OutputEntry | None = None
self._current_streaming_entry_index: int = -1
self._last_entry_type: OutputEntryType | None = None
@property
def state(self) -> HarnessAppState:
"""Get the current application state."""
return self._state
@property
def current_mode(self) -> str | None:
"""Get the current agent mode."""
return self._state.mode_text
@current_mode.setter
def current_mode(self, value: str | None) -> None:
"""Set the current agent mode."""
self._state.mode_text = value
self._state.mode_color = get_mode_color(value, self._mode_colors)
self._notify()
# --- Streaming lifecycle ---
def begin_streaming(self) -> None:
"""Begin streaming mode - switch bottom panel and show spinner."""
self._state.mode = BottomPanelMode.STREAMING
self._state.show_spinner = True
self._state.input_enabled = False
self._notify()
def begin_streaming_output(self) -> None:
"""Reset per-turn streaming bookkeeping."""
self._has_received_any_text = False
self._current_streaming_entry = None
self._current_streaming_entry_index = -1
def end_streaming(self) -> None:
"""End streaming mode - return to text input."""
self._state.mode = BottomPanelMode.TEXT_INPUT
self._state.show_spinner = False
self._state.input_enabled = True
self._notify()
async def end_streaming_output(self) -> None:
"""Finalize streaming output - add trailing newline if text was received."""
if self._has_received_any_text:
self._current_streaming_entry = None
self._last_entry_type = OutputEntryType.STREAM_FOOTER
self._notify()
def set_show_spinner(self, show: bool) -> None:
"""Show or hide the spinner."""
self._state.show_spinner = show
self._notify()
# --- Text output ---
def write_user_input_echo(self, text: str) -> None:
"""Echo user input to the output area."""
entry = OutputEntry(
type=OutputEntryType.USER_INPUT,
text=f"You: {text}",
color="green",
)
self._append_entry(entry)
self._last_entry_type = OutputEntryType.USER_INPUT
self._notify()
def append_info_line(self, text: str, color: str | None = None) -> None:
"""Append an informational line to the output."""
effective_color = color or get_mode_color(self._state.mode_text, self._mode_colors)
# Add separator when transitioning from streaming text
prefix = ""
if self._last_entry_type in (OutputEntryType.STREAMING_TEXT, OutputEntryType.STREAM_FOOTER):
prefix = "" # Textual handles spacing via widget layout
entry = OutputEntry(
type=OutputEntryType.INFO_LINE,
text=prefix + text,
color=effective_color,
)
self._append_entry(entry)
self._last_entry_type = OutputEntryType.INFO_LINE
self._notify()
def append_stream_footer(self, text: str) -> None:
"""Append a footer line after streaming ends."""
entry = OutputEntry(
type=OutputEntryType.STREAM_FOOTER,
text=text,
color="dim",
)
self._append_entry(entry)
self._last_entry_type = OutputEntryType.STREAM_FOOTER
self._notify()
async def write_info_line(self, text: str, color: str | None = None) -> None:
"""Async version of append_info_line."""
self.append_info_line(text, color)
def write_text(self, text: str, color: str | None = None) -> None:
"""Write streaming text from the agent.
Accumulates text into the current streaming entry. If the streaming
entry is still the last output item, appends to it in place. Otherwise
starts a new streaming entry.
Args:
text: The text chunk to append.
color: Optional Rich color.
"""
self._last_entry_type = OutputEntryType.STREAMING_TEXT
self._has_received_any_text = True
effective_color = color or get_mode_color(self._state.mode_text, self._mode_colors)
if (
self._current_streaming_entry is not None
and self._current_streaming_entry_index == len(self._state.output_entries) - 1
):
# Append to existing streaming entry in place
self._current_streaming_entry.text += text
# Update the entry in the list (same object, but trigger notify)
else:
# Start a fresh streaming entry
self._current_streaming_entry = OutputEntry(
type=OutputEntryType.STREAMING_TEXT,
text=text,
color=effective_color,
)
self._state.output_entries.append(self._current_streaming_entry)
self._current_streaming_entry_index = len(self._state.output_entries) - 1
self._notify()
def update_streaming_text(self, accumulated_text: str) -> None:
"""Update the accumulated streaming text (full replacement).
Alternative to write_text() - replaces the entire streaming entry text.
If an info_line was appended after the streaming entry (e.g., a tool
call), creates a new streaming entry at the end of the list so the
UI can render it.
Args:
accumulated_text: The full accumulated text so far.
"""
effective_color = get_mode_color(self._state.mode_text, self._mode_colors)
if (
self._current_streaming_entry is not None
and self._current_streaming_entry_index == len(self._state.output_entries) - 1
):
# Streaming entry is still the last entry — update in place
self._current_streaming_entry.text = accumulated_text
else:
# Either no current entry, or it's no longer at the end (an
# info_line was appended after it). Create a new streaming entry
# so the panel can render the continued text.
self._current_streaming_entry = OutputEntry(
type=OutputEntryType.STREAMING_TEXT,
text=accumulated_text,
color=effective_color,
)
self._state.output_entries.append(self._current_streaming_entry)
self._current_streaming_entry_index = len(self._state.output_entries) - 1
self._last_entry_type = OutputEntryType.STREAMING_TEXT
self._has_received_any_text = True
self._notify()
async def write_no_text_warning(self, has_follow_up_actions: bool) -> None:
"""Write '(no text response)' warning if no text was received."""
if not self._has_received_any_text and not has_follow_up_actions:
self.append_stream_footer("(no text response from agent)")
# --- Usage and mode ---
def set_usage_text(self, usage_text: str | None) -> None:
"""Set the token usage text."""
self._state.usage_text = usage_text
self._notify()
def set_mode(self, mode: str | None, mode_color: str | None = None) -> None:
"""Set the current mode."""
self._state.mode_text = mode
self._state.mode_color = mode_color or get_mode_color(mode, self._mode_colors)
self._notify()
# --- Follow-up actions ---
def enqueue_follow_up_action(self, action: FollowUpAction) -> None:
"""Enqueue a follow-up action."""
if isinstance(action, FollowUpMessage):
self._state.accumulated_follow_up_responses.append(action.message)
elif isinstance(action, FollowUpQuestion):
self.queue_follow_up_questions([action])
def queue_follow_up_questions(self, questions: list[FollowUpQuestion]) -> None:
"""Queue follow-up questions for user interaction.
Args:
questions: List of questions to queue.
"""
if not questions:
return
was_empty = len(self._state.pending_questions) == 0
self._state.pending_questions.extend(questions)
if was_empty:
self._configure_for_head_question(self._state.pending_questions[0])
self._notify()
def add_follow_up_response(self, response: Message) -> None:
"""Add a follow-up response message."""
self._state.accumulated_follow_up_responses.append(response)
def advance_follow_up_question(self) -> None:
"""Advance to the next follow-up question.
Removes the head question from the queue. If more questions remain,
configures the UI for the next one. Otherwise returns to text input.
"""
if not self._state.pending_questions:
return
self._state.pending_questions.pop(0)
if self._state.pending_questions:
self._configure_for_head_question(self._state.pending_questions[0])
else:
# No more questions - return to text input
self._state.mode = BottomPanelMode.TEXT_INPUT
self._state.list_selection_options = []
self._state.list_selection_title = None
self._state.list_selection_custom_text_placeholder = None
self._state.list_selection_index = 0
self._state.list_selection_custom_input_text = ""
self._notify()
def take_follow_up_responses(self) -> list[Message]:
"""Take and clear all accumulated follow-up responses.
Returns:
List of accumulated response messages.
"""
responses = list(self._state.accumulated_follow_up_responses)
self._state.accumulated_follow_up_responses.clear()
return responses
def has_pending_questions(self) -> bool:
"""Check if there are pending follow-up questions.
Returns:
True if unanswered questions exist in the queue.
"""
return len(self._state.pending_questions) > 0
# --- Queued messages (message injection) ---
def set_queued_messages(self, pending: list[str]) -> None:
"""Set the queued message display.
Args:
pending: List of pending message texts.
"""
self._state.queued_items = [f"💬 {text}" for text in pending]
self._notify()
# --- Internal helpers ---
def _append_entry(self, entry: OutputEntry) -> None:
"""Append an output entry to the state."""
self._state.output_entries.append(entry)
def _configure_for_head_question(self, question: FollowUpQuestion) -> None:
"""Configure the UI for the current head question.
Args:
question: The question to display.
"""
if isinstance(question, ChoiceFollowUpQuestion):
self._state.mode = BottomPanelMode.LIST_SELECTION
self._state.list_selection_options = list(question.choices)
self._state.list_selection_title = question.prompt
self._state.list_selection_custom_text_placeholder = (
"✏️ Type a custom response..." if question.allow_custom_text else None
)
self._state.list_selection_index = 0
self._state.list_selection_custom_input_text = ""
else:
# Text question - show as info line and switch to text input
self.append_info_line(question.prompt)
self._state.mode = BottomPanelMode.TEXT_INPUT
self._state.list_selection_options = []
self._state.list_selection_title = None
def _notify(self) -> None:
"""Notify the app that state has changed."""
self._on_state_changed()
def request_shutdown(self) -> None:
"""Request the application to shut down."""
self._state.shutdown_requested = True
self._notify()
def replace_session(self, session) -> None:
"""Replace the current agent session.
Args:
session: The new AgentSession to use.
"""
self._state.replaced_session = session
self._notify()
@@ -1,6 +1,19 @@
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "agent-framework",
# "textual>=6.2.1",
# "rich>=13.7.1",
# "azure-identity",
# "python-dotenv",
# ]
# ///
# Run with any PEP 723 compatible runner, e.g.:
# uv run samples/02-agents/harness/harness_research.py
# Copyright (c) Microsoft. All rights reserved.
"""Harness Research Assistant.
"""Harness Research Assistant with Console UI.
Demonstrates ``create_harness_agent`` — a factory function that builds a
pre-configured agent with batteries included, automatically wiring up function
@@ -16,12 +29,9 @@ context providers:
- **Web Search** — real-time web search via ``get_web_search_tool()``
The sample creates a research-focused agent with web search capability and runs
a simple interactive chat loop. The agent will plan research tasks using todos,
switch between plan and execute modes, search the web for current information,
and track its progress.
Special commands:
/exit — End the session.
it inside the Textual-based harness console. The agent will plan research tasks
using todos, switch between plan and execute modes, search the web for current
information, and track its progress.
Environment variables:
FOUNDRY_PROJECT_ENDPOINT — Azure AI Foundry project endpoint URL
@@ -36,19 +46,24 @@ import asyncio
from agent_framework import create_harness_agent
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential
from console import build_observers_with_planning, run_agent_async
from dotenv import load_dotenv
RESEARCH_INSTRUCTIONS = """\
## Research Assistant Instructions
You are a research assistant. When given a research topic, research it thoroughly using web search and web browsing.
Use your knowledge to form good search queries and hypotheses, but always verify claims with the tools available to you rather than relying on memory alone.
You are a research assistant. When given a research topic, research it
thoroughly using web search and web browsing. Use your knowledge to form good
search queries and hypotheses, but always verify claims with the tools
available to you rather than relying on memory alone.
### Research quality
Consult multiple sources when possible and cross-reference key claims.
When sources disagree, note the discrepancy and explain which source you consider more reliable and why.
If a web page fails to load or a search returns irrelevant results, try alternative search queries or sources before moving on.
When sources disagree, note the discrepancy and explain which source you
consider more reliable and why.
If a web page fails to load or a search returns irrelevant results, try
alternative search queries or sources before moving on.
Track your sources — you will need them when presenting results.
### Presenting results
@@ -58,7 +73,8 @@ When presenting your final findings:
- Use clear sections with headings for each major topic or sub-question.
- Cite your sources inline (e.g., "According to [source name](URL), ...").
- End with a brief summary of key takeaways.
- In addition to returning the results to the user, save the final research report to file memory so it survives compaction and can be referenced later.
- In addition to returning the results to the user, save the final research
report to file memory so it survives compaction and can be referenced later.
"""
@@ -82,64 +98,17 @@ async def main() -> None:
agent_instructions=RESEARCH_INSTRUCTIONS,
)
# Create a session to maintain conversation state across turns.
session = agent.create_session()
print("Research Assistant (powered by create_harness_agent)")
print("=" * 50)
print("Enter a research topic to get started.")
print("Type /exit to end the session.\n")
# Simple interactive chat loop.
while True:
user_input = input("You: ").strip()
if not user_input:
continue
if user_input.lower() == "/exit":
print("\nGoodbye!")
break
# Run the agent with streaming and print the response as it arrives.
print("\nAssistant: ", end="", flush=True)
async for update in agent.run(user_input, session=session, stream=True):
if update.contents:
for content in update.contents:
# Print a brief message for each tool call in the stream.
if content.type == "function_call":
print(f"\n [calling tool: {content.name}]", flush=True)
print(" ", end="", flush=True)
# Show web search activity when the result arrives with action details.
elif (
content.type in ("search_tool_call", "search_tool_result")
and getattr(content, "tool_name", None) == "web_search"
):
action = None
if content.type == "search_tool_result" and isinstance(content.result, dict):
action = content.result.get("action", {})
elif content.type == "search_tool_call":
action = content.arguments if isinstance(content.arguments, dict) else None
if action:
action_type = action.get("type", "search")
if action_type == "search":
queries = action.get("queries") or []
query_str = ", ".join(f'"{q}"' for q in queries) if queries else action.get("query", "")
print(f"\n 🌐 Web search: {query_str}", flush=True)
print(" ", end="", flush=True)
elif action_type == "open_page":
url = action.get("url", "(unknown)")
print(f"\n 🌐 Opening: {url}", flush=True)
print(" ", end="", flush=True)
elif action_type == "find_in_page":
pattern = action.get("pattern", "")
print(f'\n 🌐 Find in page: "{pattern}"', flush=True)
print(" ", end="", flush=True)
else:
print(f"\n 🌐 Web search: {action_type}", flush=True)
print(" ", end="", flush=True)
# Print text content as it streams in.
if update.text:
print(update.text, end="", flush=True)
print("\n")
# Run the harness console with the research agent.
await run_agent_async(
agent,
session=agent.create_session(),
observers=build_observers_with_planning(agent),
initial_mode="plan",
title="🔬 Research Assistant",
placeholder="Enter a research topic...",
max_context_window_tokens=128_000,
max_output_tokens=16_384,
)
if __name__ == "__main__":